Coverage for src/couchers/servicers/requests.py: 93%

292 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-22 00:46 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import exists 

7from sqlalchemy.orm import aliased 

8from sqlalchemy.sql import and_, func, or_ 

9 

10from couchers.constants import HOST_REQUEST_MIN_LENGTH_UTF16 

11from couchers.materialized_views import UserResponseRate 

12from couchers.metrics import ( 

13 account_age_on_host_request_create_histogram, 

14 host_request_first_response_histogram, 

15 host_request_responses_counter, 

16 host_requests_sent_counter, 

17 sent_messages_counter, 

18) 

19from couchers.models import ( 

20 Conversation, 

21 HostRequest, 

22 HostRequestFeedback, 

23 HostRequestQuality, 

24 HostRequestStatus, 

25 Message, 

26 MessageType, 

27 ModerationObjectType, 

28 RateLimitAction, 

29 User, 

30) 

31from couchers.moderation.utils import create_moderation 

32from couchers.notifications.notify import notify 

33from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc 

34from couchers.rate_limits.check import process_rate_limits_and_check_abort 

35from couchers.rate_limits.definitions import RATE_LIMIT_HOURS 

36from couchers.servicers.api import response_rate_to_pb, user_model_to_pb 

37from couchers.sql import couchers_select as select 

38from couchers.utils import ( 

39 Timestamp_from_datetime, 

40 date_to_api, 

41 get_coordinates, 

42 now, 

43 parse_date, 

44 today_in_timezone, 

45) 

46 

47logger = logging.getLogger(__name__) 

48 

49DEFAULT_PAGINATION_LENGTH = 10 

50MAX_PAGE_SIZE = 50 

51 

52 

53hostrequeststatus2api = { 

54 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING, 

55 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

56 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED, 

57 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED, 

58 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED, 

59} 

60 

61hostrequestquality2sql = { 

62 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality, 

63 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality, 

64 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality, 

65} 

66 

67 

68def message_to_pb(message: Message): 

69 """ 

70 Turns the given message to a protocol buffer 

71 """ 

72 if message.is_normal_message: 

73 return conversations_pb2.Message( 

74 message_id=message.id, 

75 author_user_id=message.author_id, 

76 time=Timestamp_from_datetime(message.time), 

77 text=conversations_pb2.MessageContentText(text=message.text), 

78 ) 

79 else: 

80 return conversations_pb2.Message( 

81 message_id=message.id, 

82 author_user_id=message.author_id, 

83 time=Timestamp_from_datetime(message.time), 

84 chat_created=( 

85 conversations_pb2.MessageContentChatCreated() 

86 if message.message_type == MessageType.chat_created 

87 else None 

88 ), 

89 host_request_status_changed=( 

90 conversations_pb2.MessageContentHostRequestStatusChanged( 

91 status=hostrequeststatus2api[message.host_request_status_target] 

92 ) 

93 if message.message_type == MessageType.host_request_status_changed 

94 else None 

95 ), 

96 ) 

97 

98 

99def host_request_to_pb(host_request: HostRequest, session, context): 

100 initial_message = session.execute( 

101 select(Message) 

102 .where(Message.conversation_id == host_request.conversation_id) 

103 .order_by(Message.id.asc()) 

104 .limit(1) 

105 ).scalar_one() 

106 

107 latest_message = session.execute( 

108 select(Message) 

109 .where(Message.conversation_id == host_request.conversation_id) 

110 .order_by(Message.id.desc()) 

111 .limit(1) 

112 ).scalar_one() 

113 

114 lat, lng = get_coordinates(host_request.hosting_location) 

115 

116 need_feedback = False 

117 if context.user_id == host_request.host_user_id and host_request.status == HostRequestStatus.rejected: 

118 need_feedback = not session.execute( 

119 select( 

120 exists().where( 

121 HostRequestFeedback.from_user_id == context.user_id, 

122 HostRequestFeedback.host_request_id == host_request.conversation_id, 

123 ) 

124 ) 

125 ).scalar_one() 

126 

127 return requests_pb2.HostRequest( 

128 host_request_id=host_request.conversation_id, 

129 surfer_user_id=host_request.surfer_user_id, 

130 host_user_id=host_request.host_user_id, 

131 status=hostrequeststatus2api[host_request.status], 

132 created=Timestamp_from_datetime(initial_message.time), 

133 from_date=date_to_api(host_request.from_date), 

134 to_date=date_to_api(host_request.to_date), 

135 last_seen_message_id=( 

136 host_request.surfer_last_seen_message_id 

137 if context.user_id == host_request.surfer_user_id 

138 else host_request.host_last_seen_message_id 

139 ), 

140 latest_message=message_to_pb(latest_message), 

141 hosting_city=host_request.hosting_city, 

142 hosting_lat=lat, 

143 hosting_lng=lng, 

144 hosting_radius=host_request.hosting_radius, 

145 need_host_request_feedback=need_feedback, 

146 ) 

147 

148 

149def _possibly_observe_first_response_time(session, host_request, user_id, response_type): 

150 # if this is the first response then there's nothing by this user yet 

151 assert host_request.host_user_id == user_id 

152 

153 number_messages_by_host = session.execute( 

154 select(func.count()) 

155 .where(Message.conversation_id == host_request.conversation_id) 

156 .where(Message.author_id == user_id) 

157 ).scalar_one_or_none() 

158 

159 if number_messages_by_host == 0: 

160 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one() 

161 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one() 

162 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe( 

163 (now() - host_request.conversation.created).total_seconds() 

164 ) 

165 

166 

167def _is_host_request_long_enough(text: str) -> bool: 

168 # Python's len(str) does not match Javascript's string.length. 

169 # e.g. len("é") == 2 but "é".length == 1. 

170 # To match the frontend's validation, measure the string in utf16 code units. 

171 text_length_utf16 = len(text.encode("utf-16-le")) // 2 # utf-16-le does not include a prefix BOM code unit. 

172 return text_length_utf16 >= HOST_REQUEST_MIN_LENGTH_UTF16 

173 

174 

175class Requests(requests_pb2_grpc.RequestsServicer): 

176 def CreateHostRequest(self, request, context, session): 

177 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

178 if not user.has_completed_profile: 

179 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request") 

180 

181 if request.host_user_id == context.user_id: 

182 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self") 

183 

184 # just to check host exists and is visible 

185 host = session.execute( 

186 select(User).where_users_visible(context).where(User.id == request.host_user_id) 

187 ).scalar_one_or_none() 

188 if not host: 

189 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

190 

191 from_date = parse_date(request.from_date) 

192 to_date = parse_date(request.to_date) 

193 

194 if not from_date or not to_date: 

195 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date") 

196 

197 today = today_in_timezone(host.timezone) 

198 

199 # request starts from the past 

200 if from_date < today: 

201 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today") 

202 

203 # from_date is not >= to_date 

204 if from_date >= to_date: 

205 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to") 

206 

207 # No need to check today > to_date 

208 

209 if from_date - today > timedelta(days=365): 

210 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year") 

211 

212 if to_date - from_date > timedelta(days=365): 

213 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year") 

214 

215 # Check minimum length 

216 if not _is_host_request_long_enough(request.text): 

217 context.abort_with_error_code( 

218 grpc.StatusCode.INVALID_ARGUMENT, 

219 "host_request_too_short", 

220 substitutions={"chars": HOST_REQUEST_MIN_LENGTH_UTF16}, 

221 ) 

222 

223 # Check if user has been sending host requests excessively 

224 if process_rate_limits_and_check_abort( 

225 session=session, user_id=context.user_id, action=RateLimitAction.host_request 

226 ): 

227 context.abort_with_error_code( 

228 grpc.StatusCode.RESOURCE_EXHAUSTED, 

229 "host_request_rate_limit", 

230 substitutions={"hours": RATE_LIMIT_HOURS}, 

231 ) 

232 

233 conversation = Conversation() 

234 session.add(conversation) 

235 session.flush() 

236 

237 session.add( 

238 Message( 

239 conversation_id=conversation.id, 

240 author_id=context.user_id, 

241 message_type=MessageType.chat_created, 

242 ) 

243 ) 

244 

245 message = Message( 

246 conversation_id=conversation.id, 

247 author_id=context.user_id, 

248 text=request.text, 

249 message_type=MessageType.text, 

250 ) 

251 session.add(message) 

252 session.flush() 

253 

254 # Create moderation state for UMS (starts as SHADOWED) 

255 moderation_state = create_moderation( 

256 session=session, 

257 object_type=ModerationObjectType.HOST_REQUEST, 

258 object_id=conversation.id, 

259 creator_user_id=context.user_id, 

260 ) 

261 

262 host_request = HostRequest( 

263 conversation_id=conversation.id, 

264 surfer_user_id=context.user_id, 

265 host_user_id=host.id, 

266 moderation_state_id=moderation_state.id, 

267 from_date=from_date, 

268 to_date=to_date, 

269 status=HostRequestStatus.pending, 

270 surfer_last_seen_message_id=message.id, 

271 # TODO: tz 

272 # timezone=host.timezone, 

273 hosting_city=host.city, 

274 hosting_location=host.geom, 

275 hosting_radius=host.geom_radius, 

276 ) 

277 session.add(host_request) 

278 session.flush() 

279 

280 notify( 

281 session, 

282 user_id=host_request.host_user_id, 

283 topic_action="host_request:create", 

284 key=str(host_request.conversation_id), 

285 data=notification_data_pb2.HostRequestCreate( 

286 host_request=host_request_to_pb(host_request, session, context), 

287 surfer=user_model_to_pb(host_request.surfer, session, context), 

288 text=request.text, 

289 ), 

290 moderation_state_id=moderation_state.id, 

291 ) 

292 

293 host_requests_sent_counter.labels(user.gender, host.gender).inc() 

294 sent_messages_counter.labels(user.gender, "host request send").inc() 

295 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe( 

296 (now() - user.joined).total_seconds() 

297 ) 

298 

299 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id) 

300 

301 def GetHostRequest(self, request, context, session): 

302 host_request = session.execute( 

303 select(HostRequest) 

304 .where_users_column_visible(context, HostRequest.surfer_user_id) 

305 .where_users_column_visible(context, HostRequest.host_user_id) 

306 .where_moderated_content_visible(context, HostRequest, is_list_operation=False) 

307 .where(HostRequest.conversation_id == request.host_request_id) 

308 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

309 ).scalar_one_or_none() 

310 

311 if not host_request: 

312 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

313 

314 return host_request_to_pb(host_request, session, context) 

315 

316 def ListHostRequests(self, request, context, session): 

317 if request.only_sent and request.only_received: 

318 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

319 

320 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

321 pagination = min(pagination, MAX_PAGE_SIZE) 

322 

323 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have 

324 # none as message_2.id. So just filter for these ones to get highest messages only. 

325 # See https://stackoverflow.com/a/27802817/6115336 

326 message_2 = aliased(Message) 

327 statement = ( 

328 select(Message, HostRequest, Conversation) 

329 .outerjoin(message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id)) 

330 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

331 .join(Conversation, Conversation.id == HostRequest.conversation_id) 

332 .where_users_column_visible(context, HostRequest.surfer_user_id) 

333 .where_users_column_visible(context, HostRequest.host_user_id) 

334 .where_moderated_content_visible(context, HostRequest, is_list_operation=True) 

335 .where(message_2.id == None) 

336 .where(or_(Message.id < request.last_request_id, request.last_request_id == 0)) 

337 ) 

338 

339 if request.only_sent: 

340 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

341 elif request.only_received: 

342 statement = statement.where(HostRequest.host_user_id == context.user_id) 

343 elif request.HasField("only_archived"): 

344 statement = statement.where( 

345 or_( 

346 and_( 

347 HostRequest.surfer_user_id == context.user_id, 

348 HostRequest.is_surfer_archived == request.only_archived, 

349 ), 

350 and_( 

351 HostRequest.host_user_id == context.user_id, 

352 HostRequest.is_host_archived == request.only_archived, 

353 ), 

354 ) 

355 ) 

356 else: 

357 statement = statement.where( 

358 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

359 ) 

360 

361 # TODO: I considered having the latest control message be the single source of truth for 

362 # the HostRequest.status, but decided against it because of this filter. 

363 # Another possibility is to filter in the python instead of SQL, but that's slower 

364 if request.only_active: 

365 statement = statement.where( 

366 or_( 

367 HostRequest.status == HostRequestStatus.pending, 

368 HostRequest.status == HostRequestStatus.accepted, 

369 HostRequest.status == HostRequestStatus.confirmed, 

370 ) 

371 ) 

372 statement = statement.where(HostRequest.end_time <= func.now()) 

373 

374 statement = statement.order_by(Message.id.desc()).limit(pagination + 1) 

375 results = session.execute(statement).all() 

376 

377 host_requests = [] 

378 for result in results[:pagination]: 

379 lat, lng = get_coordinates(result.HostRequest.hosting_location) 

380 host_requests.append( 

381 requests_pb2.HostRequest( 

382 host_request_id=result.HostRequest.conversation_id, 

383 surfer_user_id=result.HostRequest.surfer_user_id, 

384 host_user_id=result.HostRequest.host_user_id, 

385 status=hostrequeststatus2api[result.HostRequest.status], 

386 created=Timestamp_from_datetime(result.Conversation.created), 

387 from_date=date_to_api(result.HostRequest.from_date), 

388 to_date=date_to_api(result.HostRequest.to_date), 

389 last_seen_message_id=( 

390 result.HostRequest.surfer_last_seen_message_id 

391 if context.user_id == result.HostRequest.surfer_user_id 

392 else result.HostRequest.host_last_seen_message_id 

393 ), 

394 latest_message=message_to_pb(result.Message), 

395 hosting_city=result.HostRequest.hosting_city, 

396 hosting_lat=lat, 

397 hosting_lng=lng, 

398 hosting_radius=result.HostRequest.hosting_radius, 

399 ) 

400 ) 

401 

402 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO 

403 no_more = len(results) <= pagination 

404 

405 return requests_pb2.ListHostRequestsRes( 

406 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests 

407 ) 

408 

409 def RespondHostRequest(self, request, context, session): 

410 def count_host_response(other_user_id, response_type): 

411 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

412 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one() 

413 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc() 

414 sent_messages_counter.labels(user_gender, "host request response").inc() 

415 

416 host_request = session.execute( 

417 select(HostRequest) 

418 .where_users_column_visible(context, HostRequest.surfer_user_id) 

419 .where_users_column_visible(context, HostRequest.host_user_id) 

420 .where_moderated_content_visible(context, HostRequest, is_list_operation=False) 

421 .where(HostRequest.conversation_id == request.host_request_id) 

422 ).scalar_one_or_none() 

423 

424 if not host_request: 

425 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

426 

427 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

428 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

429 

430 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING: 

431 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

432 

433 if host_request.end_time < now(): 

434 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past") 

435 

436 control_message = Message() 

437 

438 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED: 

439 # only host can accept 

440 if context.user_id != host_request.host_user_id: 

441 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host") 

442 # can't accept a cancelled or confirmed request (only reject), or already accepted 

443 if ( 

444 host_request.status == HostRequestStatus.cancelled 

445 or host_request.status == HostRequestStatus.confirmed 

446 or host_request.status == HostRequestStatus.accepted 

447 ): 

448 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

449 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted") 

450 control_message.host_request_status_target = HostRequestStatus.accepted 

451 host_request.status = HostRequestStatus.accepted 

452 session.flush() 

453 

454 notify( 

455 session, 

456 user_id=host_request.surfer_user_id, 

457 topic_action="host_request:accept", 

458 key=str(host_request.conversation_id), 

459 data=notification_data_pb2.HostRequestAccept( 

460 host_request=host_request_to_pb(host_request, session, context), 

461 host=user_model_to_pb(host_request.host, session, context), 

462 ), 

463 moderation_state_id=host_request.moderation_state_id, 

464 ) 

465 

466 count_host_response(host_request.surfer_user_id, "accepted") 

467 

468 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED: 

469 # only host can reject 

470 if context.user_id != host_request.host_user_id: 

471 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

472 # can't reject a cancelled or already rejected request 

473 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 

474 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

475 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected") 

476 control_message.host_request_status_target = HostRequestStatus.rejected 

477 host_request.status = HostRequestStatus.rejected 

478 session.flush() 

479 

480 notify( 

481 session, 

482 user_id=host_request.surfer_user_id, 

483 topic_action="host_request:reject", 

484 key=str(host_request.conversation_id), 

485 data=notification_data_pb2.HostRequestReject( 

486 host_request=host_request_to_pb(host_request, session, context), 

487 host=user_model_to_pb(host_request.host, session, context), 

488 ), 

489 moderation_state_id=host_request.moderation_state_id, 

490 ) 

491 

492 count_host_response(host_request.surfer_user_id, "rejected") 

493 

494 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED: 

495 # only surfer can confirm 

496 if context.user_id != host_request.surfer_user_id: 

497 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

498 # can only confirm an accepted request 

499 if host_request.status != HostRequestStatus.accepted: 

500 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

501 control_message.host_request_status_target = HostRequestStatus.confirmed 

502 host_request.status = HostRequestStatus.confirmed 

503 session.flush() 

504 

505 notify( 

506 session, 

507 user_id=host_request.host_user_id, 

508 topic_action="host_request:confirm", 

509 key=str(host_request.conversation_id), 

510 data=notification_data_pb2.HostRequestConfirm( 

511 host_request=host_request_to_pb(host_request, session, context), 

512 surfer=user_model_to_pb(host_request.surfer, session, context), 

513 ), 

514 moderation_state_id=host_request.moderation_state_id, 

515 ) 

516 

517 count_host_response(host_request.host_user_id, "confirmed") 

518 

519 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED: 

520 # only surfer can cancel 

521 if context.user_id != host_request.surfer_user_id: 

522 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

523 # can't' cancel an already cancelled or rejected request 

524 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 

525 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

526 control_message.host_request_status_target = HostRequestStatus.cancelled 

527 host_request.status = HostRequestStatus.cancelled 

528 session.flush() 

529 

530 notify( 

531 session, 

532 user_id=host_request.host_user_id, 

533 topic_action="host_request:cancel", 

534 key=str(host_request.conversation_id), 

535 data=notification_data_pb2.HostRequestCancel( 

536 host_request=host_request_to_pb(host_request, session, context), 

537 surfer=user_model_to_pb(host_request.surfer, session, context), 

538 ), 

539 moderation_state_id=host_request.moderation_state_id, 

540 ) 

541 

542 count_host_response(host_request.host_user_id, "cancelled") 

543 

544 control_message.message_type = MessageType.host_request_status_changed 

545 control_message.conversation_id = host_request.conversation_id 

546 control_message.author_id = context.user_id 

547 session.add(control_message) 

548 

549 if request.text: 

550 latest_message = Message() 

551 latest_message.conversation_id = host_request.conversation_id 

552 latest_message.text = request.text 

553 latest_message.author_id = context.user_id 

554 latest_message.message_type = MessageType.text 

555 session.add(latest_message) 

556 else: 

557 latest_message = control_message 

558 

559 session.flush() 

560 

561 if host_request.surfer_user_id == context.user_id: 

562 host_request.surfer_last_seen_message_id = latest_message.id 

563 else: 

564 host_request.host_last_seen_message_id = latest_message.id 

565 session.commit() 

566 

567 return empty_pb2.Empty() 

568 

569 def GetHostRequestMessages(self, request, context, session): 

570 host_request = session.execute( 

571 select(HostRequest) 

572 .where_moderated_content_visible(context, HostRequest, is_list_operation=False) 

573 .where(HostRequest.conversation_id == request.host_request_id) 

574 ).scalar_one_or_none() 

575 

576 if not host_request: 

577 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

578 

579 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

580 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

581 

582 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

583 pagination = min(pagination, MAX_PAGE_SIZE) 

584 

585 messages = ( 

586 session.execute( 

587 select(Message) 

588 .where(Message.conversation_id == host_request.conversation_id) 

589 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

590 .order_by(Message.id.desc()) 

591 .limit(pagination + 1) 

592 ) 

593 .scalars() 

594 .all() 

595 ) 

596 

597 no_more = len(messages) <= pagination 

598 

599 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0 

600 

601 return requests_pb2.GetHostRequestMessagesRes( 

602 last_message_id=last_message_id, 

603 no_more=no_more, 

604 messages=[message_to_pb(message) for message in messages[:pagination]], 

605 ) 

606 

607 def SendHostRequestMessage(self, request, context, session): 

608 if request.text == "": 

609 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

610 host_request = session.execute( 

611 select(HostRequest) 

612 .where_moderated_content_visible(context, HostRequest, is_list_operation=False) 

613 .where(HostRequest.conversation_id == request.host_request_id) 

614 ).scalar_one_or_none() 

615 

616 if not host_request: 

617 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

618 

619 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

620 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

621 

622 if host_request.host_user_id == context.user_id: 

623 _possibly_observe_first_response_time(session, host_request, context.user_id, "message") 

624 

625 message = Message() 

626 message.conversation_id = host_request.conversation_id 

627 message.author_id = context.user_id 

628 message.message_type = MessageType.text 

629 message.text = request.text 

630 session.add(message) 

631 session.flush() 

632 

633 if host_request.surfer_user_id == context.user_id: 

634 host_request.surfer_last_seen_message_id = message.id 

635 

636 notify( 

637 session, 

638 user_id=host_request.host_user_id, 

639 topic_action="host_request:message", 

640 key=str(host_request.conversation_id), 

641 data=notification_data_pb2.HostRequestMessage( 

642 host_request=host_request_to_pb(host_request, session, context), 

643 user=user_model_to_pb(host_request.surfer, session, context), 

644 text=request.text, 

645 am_host=True, 

646 ), 

647 moderation_state_id=host_request.moderation_state_id, 

648 ) 

649 

650 else: 

651 host_request.host_last_seen_message_id = message.id 

652 

653 notify( 

654 session, 

655 user_id=host_request.surfer_user_id, 

656 topic_action="host_request:message", 

657 key=str(host_request.conversation_id), 

658 data=notification_data_pb2.HostRequestMessage( 

659 host_request=host_request_to_pb(host_request, session, context), 

660 user=user_model_to_pb(host_request.host, session, context), 

661 text=request.text, 

662 am_host=False, 

663 ), 

664 moderation_state_id=host_request.moderation_state_id, 

665 ) 

666 

667 session.commit() 

668 

669 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

670 sent_messages_counter.labels(user_gender, "host request").inc() 

671 

672 return empty_pb2.Empty() 

673 

674 def GetHostRequestUpdates(self, request, context, session): 

675 if request.only_sent and request.only_received: 

676 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

677 

678 if request.newest_message_id == 0: 

679 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

680 

681 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 

682 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

683 

684 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

685 pagination = min(pagination, MAX_PAGE_SIZE) 

686 

687 statement = ( 

688 select( 

689 Message, 

690 HostRequest.status.label("host_request_status"), 

691 HostRequest.conversation_id.label("host_request_id"), 

692 ) 

693 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

694 .where_moderated_content_visible(context, HostRequest, is_list_operation=False) 

695 .where(Message.id > request.newest_message_id) 

696 ) 

697 

698 if request.only_sent: 

699 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

700 elif request.only_received: 

701 statement = statement.where(HostRequest.host_user_id == context.user_id) 

702 else: 

703 statement = statement.where( 

704 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

705 ) 

706 

707 statement = statement.order_by(Message.id.asc()).limit(pagination + 1) 

708 res = session.execute(statement).all() 

709 

710 no_more = len(res) <= pagination 

711 

712 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO 

713 

714 return requests_pb2.GetHostRequestUpdatesRes( 

715 no_more=no_more, 

716 updates=[ 

717 requests_pb2.HostRequestUpdate( 

718 host_request_id=result.host_request_id, 

719 status=hostrequeststatus2api[result.host_request_status], 

720 message=message_to_pb(result.Message), 

721 ) 

722 for result in res[:pagination] 

723 ], 

724 ) 

725 

726 def MarkLastSeenHostRequest(self, request, context, session): 

727 host_request = session.execute( 

728 select(HostRequest) 

729 .where_moderated_content_visible(context, HostRequest, is_list_operation=False) 

730 .where(HostRequest.conversation_id == request.host_request_id) 

731 ).scalar_one_or_none() 

732 

733 if not host_request: 

734 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

735 

736 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

737 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

738 

739 if host_request.surfer_user_id == context.user_id: 

740 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id: 

741 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

742 host_request.surfer_last_seen_message_id = request.last_seen_message_id 

743 else: 

744 if not host_request.host_last_seen_message_id <= request.last_seen_message_id: 

745 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

746 host_request.host_last_seen_message_id = request.last_seen_message_id 

747 

748 session.commit() 

749 return empty_pb2.Empty() 

750 

751 def SetHostRequestArchiveStatus(self, request, context, session): 

752 host_request: HostRequest = session.execute( 

753 select(HostRequest) 

754 .where_moderated_content_visible(context, HostRequest, is_list_operation=False) 

755 .where(HostRequest.conversation_id == request.host_request_id) 

756 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

757 ).scalar_one_or_none() 

758 

759 if not host_request: 

760 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

761 

762 if context.user_id == host_request.surfer_user_id: 

763 host_request.is_surfer_archived = request.is_archived 

764 else: 

765 host_request.is_host_archived = request.is_archived 

766 

767 return requests_pb2.SetHostRequestArchiveStatusRes( 

768 host_request_id=host_request.conversation_id, 

769 is_archived=request.is_archived, 

770 ) 

771 

772 def GetResponseRate(self, request, context, session): 

773 user_res = session.execute( 

774 select(User.id, UserResponseRate) 

775 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id) 

776 .where_users_visible(context) 

777 .where(User.id == request.user_id) 

778 ).one_or_none() 

779 

780 # if user doesn't exist, return None 

781 if not user_res: 

782 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

783 

784 user, response_rates = user_res 

785 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates)) 

786 

787 def SendHostRequestFeedback(self, request, context, session): 

788 host_request = session.execute( 

789 select(HostRequest) 

790 .where_moderated_content_visible(context, HostRequest, is_list_operation=False) 

791 .where(HostRequest.conversation_id == request.host_request_id) 

792 .where(HostRequest.host_user_id == context.user_id) 

793 ).scalar_one_or_none() 

794 

795 if not host_request: 

796 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

797 

798 feedback = session.execute( 

799 select(HostRequestFeedback) 

800 .where(HostRequestFeedback.host_request_id == host_request.conversation_id) 

801 .where(HostRequestFeedback.from_user_id == context.user_id) 

802 ).scalar_one_or_none() 

803 

804 if feedback: 

805 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback") 

806 

807 session.add( 

808 HostRequestFeedback( 

809 host_request_id=host_request.conversation_id, 

810 from_user_id=host_request.host_user_id, 

811 to_user_id=host_request.surfer_user_id, 

812 request_quality=hostrequestquality2sql.get(request.host_request_quality), 

813 decline_reason=request.decline_reason, 

814 ) 

815 ) 

816 

817 return empty_pb2.Empty()