Coverage for app / backend / src / couchers / servicers / requests.py: 92%

313 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import exists, select 

7from sqlalchemy.orm import Session, aliased 

8from sqlalchemy.sql import and_, func, or_ 

9 

10from couchers.constants import HOST_REQUEST_MIN_LENGTH_UTF16 

11from couchers.context import CouchersContext 

12from couchers.db import can_moderate_node 

13from couchers.event_log import log_event 

14from couchers.helpers.completed_profile import has_completed_profile 

15from couchers.materialized_views import UserResponseRate 

16from couchers.metrics import ( 

17 account_age_on_host_request_create_histogram, 

18 host_request_first_response_histogram, 

19 host_request_responses_counter, 

20 host_requests_sent_counter, 

21 sent_messages_counter, 

22) 

23from couchers.models import ( 

24 Conversation, 

25 HostRequest, 

26 HostRequestFeedback, 

27 HostRequestQuality, 

28 HostRequestStatus, 

29 Message, 

30 MessageType, 

31 ModerationObjectType, 

32 RateLimitAction, 

33 User, 

34) 

35from couchers.models.notifications import NotificationTopicAction 

36from couchers.models.public_trips import PublicTrip, PublicTripStatus 

37from couchers.moderation.utils import create_moderation 

38from couchers.notifications.notify import notify 

39from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc 

40from couchers.rate_limits.check import process_rate_limits_and_check_abort 

41from couchers.rate_limits.definitions import RATE_LIMIT_HOURS 

42from couchers.servicers.api import response_rate_to_pb, user_model_to_pb 

43from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible 

44from couchers.utils import ( 

45 Timestamp_from_datetime, 

46 date_to_api, 

47 get_coordinates, 

48 now, 

49 parse_date, 

50 today_in_timezone, 

51) 

52 

53logger = logging.getLogger(__name__) 

54 

55DEFAULT_PAGINATION_LENGTH = 10 

56MAX_PAGE_SIZE = 50 

57 

58 

59hostrequeststatus2api = { 

60 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING, 

61 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

62 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED, 

63 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED, 

64 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED, 

65} 

66 

67hostrequestquality2sql = { 

68 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality, 

69 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality, 

70 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality, 

71} 

72 

73 

74def message_to_pb(message: Message) -> conversations_pb2.Message: 

75 """ 

76 Turns the given message to a protocol buffer 

77 """ 

78 if message.is_normal_message: 

79 return conversations_pb2.Message( 

80 message_id=message.id, 

81 author_user_id=message.author_id, 

82 time=Timestamp_from_datetime(message.time), 

83 text=conversations_pb2.MessageContentText(text=message.text), 

84 ) 

85 else: 

86 return conversations_pb2.Message( 

87 message_id=message.id, 

88 author_user_id=message.author_id, 

89 time=Timestamp_from_datetime(message.time), 

90 chat_created=( 

91 conversations_pb2.MessageContentChatCreated() 

92 if message.message_type == MessageType.chat_created 

93 else None 

94 ), 

95 host_request_status_changed=( 

96 conversations_pb2.MessageContentHostRequestStatusChanged( 

97 status=hostrequeststatus2api[message.host_request_status_target] # type: ignore[index] 

98 ) 

99 if message.message_type == MessageType.host_request_status_changed 

100 else None 

101 ), 

102 ) 

103 

104 

105def host_request_to_pb( 

106 host_request: HostRequest, session: Session, context: CouchersContext 

107) -> requests_pb2.HostRequest: 

108 initial_message = session.execute( 

109 select(Message) 

110 .where(Message.conversation_id == host_request.conversation_id) 

111 .order_by(Message.id.asc()) 

112 .limit(1) 

113 ).scalar_one() 

114 

115 latest_message = session.execute( 

116 select(Message) 

117 .where(Message.conversation_id == host_request.conversation_id) 

118 .order_by(Message.id.desc()) 

119 .limit(1) 

120 ).scalar_one() 

121 

122 lat, lng = get_coordinates(host_request.hosting_location) 

123 

124 need_feedback = False 

125 if context.user_id == host_request.recipient_user_id and host_request.status == HostRequestStatus.rejected: 

126 need_feedback = not session.execute( 

127 select( 

128 exists().where( 

129 HostRequestFeedback.from_user_id == context.user_id, 

130 HostRequestFeedback.host_request_id == host_request.conversation_id, 

131 ) 

132 ) 

133 ).scalar_one() 

134 

135 return requests_pb2.HostRequest( 

136 host_request_id=host_request.conversation_id, 

137 surfer_user_id=host_request.initiator_user_id, 

138 host_user_id=host_request.recipient_user_id, 

139 status=hostrequeststatus2api[host_request.status], 

140 created=Timestamp_from_datetime(initial_message.time), 

141 from_date=date_to_api(host_request.from_date), 

142 to_date=date_to_api(host_request.to_date), 

143 last_seen_message_id=( 

144 host_request.initiator_last_seen_message_id 

145 if context.user_id == host_request.initiator_user_id 

146 else host_request.recipient_last_seen_message_id 

147 ), 

148 latest_message=message_to_pb(latest_message), 

149 hosting_city=host_request.hosting_city, 

150 hosting_lat=lat, 

151 hosting_lng=lng, 

152 hosting_radius=host_request.hosting_radius, 

153 need_host_request_feedback=need_feedback, 

154 is_archived=( 

155 host_request.is_recipient_archived 

156 if context.user_id == host_request.recipient_user_id 

157 else host_request.is_initiator_archived 

158 ), 

159 public_trip_id=host_request.public_trip_id, 

160 ) 

161 

162 

163def _possibly_observe_first_response_time( 

164 session: Session, host_request: HostRequest, user_id: int, response_type: str 

165) -> None: 

166 # if this is the first response then there's nothing by this user yet 

167 assert host_request.recipient_user_id == user_id 

168 

169 number_messages_by_host = session.execute( 

170 select(func.count()) 

171 .where(Message.conversation_id == host_request.conversation_id) 

172 .where(Message.author_id == user_id) 

173 ).scalar_one_or_none() 

174 

175 if number_messages_by_host == 0: 

176 host_gender = session.execute(select(User.gender).where(User.id == host_request.recipient_user_id)).scalar_one() 

177 surfer_gender = session.execute( 

178 select(User.gender).where(User.id == host_request.initiator_user_id) 

179 ).scalar_one() 

180 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe( 

181 (now() - host_request.conversation.created).total_seconds() 

182 ) 

183 

184 

185def _is_host_request_long_enough(text: str) -> bool: 

186 # Python's len(str) does not match Javascript's string.length. 

187 # e.g. len("é") == 2 but "é".length == 1. 

188 # To match the frontend's validation, measure the string in utf16 code units. 

189 text_length_utf16 = len(text.encode("utf-16-le")) // 2 # utf-16-le does not include a prefix BOM code unit. 

190 return text_length_utf16 >= HOST_REQUEST_MIN_LENGTH_UTF16 

191 

192 

193class Requests(requests_pb2_grpc.RequestsServicer): 

194 def CreateHostRequest( 

195 self, request: requests_pb2.CreateHostRequestReq, context: CouchersContext, session: Session 

196 ) -> requests_pb2.CreateHostRequestRes: 

197 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

198 if not has_completed_profile(session, user): 

199 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request") 

200 

201 if request.host_user_id == context.user_id: 

202 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self") 

203 

204 # just to check recipient exists and is visible 

205 recipient = session.execute( 

206 select(User).where(users_visible(context, User)).where(User.id == request.host_user_id) 

207 ).scalar_one_or_none() 

208 if not recipient: 

209 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

210 

211 from_date = parse_date(request.from_date) 

212 to_date = parse_date(request.to_date) 

213 

214 if not from_date or not to_date: 

215 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date") 

216 

217 today = today_in_timezone(recipient.timezone) 

218 

219 # request starts from the past 

220 if from_date < today: 

221 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today") 

222 

223 # from_date is not >= to_date 

224 if from_date >= to_date: 

225 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to") 

226 

227 # No need to check today > to_date 

228 

229 if from_date - today > timedelta(days=365): 

230 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year") 

231 

232 if to_date - from_date > timedelta(days=365): 

233 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year") 

234 

235 # Check minimum length 

236 if not _is_host_request_long_enough(request.text): 

237 context.abort_with_error_code( 

238 grpc.StatusCode.INVALID_ARGUMENT, 

239 "host_request_too_short2", 

240 substitutions={"count": HOST_REQUEST_MIN_LENGTH_UTF16}, 

241 ) 

242 

243 # Check if user has been sending host requests excessively 

244 if process_rate_limits_and_check_abort( 

245 session=session, user_id=context.user_id, action=RateLimitAction.host_request 

246 ): 

247 context.abort_with_error_code( 

248 grpc.StatusCode.RESOURCE_EXHAUSTED, 

249 "host_request_rate_limit2", 

250 substitutions={"count": RATE_LIMIT_HOURS}, 

251 ) 

252 

253 # If this is an offer in response to a public trip, validate it 

254 public_trip_id = request.public_trip_id if request.HasField("public_trip_id") else None 

255 if public_trip_id is not None: 

256 public_trip = session.execute( 

257 select(PublicTrip).where(PublicTrip.id == public_trip_id) 

258 ).scalar_one_or_none() 

259 if not public_trip: 

260 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "public_trip_not_found") 

261 # The trip's traveler must be the recipient of this host request (role reversal) 

262 if public_trip.user_id != recipient.id: 

263 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "public_trip_user_mismatch") 

264 # Trip must still be active 

265 if public_trip.status != PublicTripStatus.searching_for_host: 

266 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "public_trip_not_active") 

267 # Offered dates must fall within the trip's window (host can shorten, not extend) 

268 if from_date < public_trip.from_date or to_date > public_trip.to_date: 

269 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "public_trip_dates_out_of_range") 

270 # Enforce same_gender_only restriction (community moderators bypass) 

271 if ( 

272 public_trip.same_gender_only 

273 and not can_moderate_node(session, context.user_id, public_trip.node_id) 

274 and user.gender != recipient.gender 

275 ): 

276 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "public_trip_same_gender_only") 

277 # Prevent duplicate offers on the same trip 

278 existing_offer = session.execute( 

279 select(HostRequest) 

280 .where(HostRequest.public_trip_id == public_trip_id) 

281 .where(HostRequest.initiator_user_id == context.user_id) 

282 ).scalar_one_or_none() 

283 if existing_offer: 

284 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "duplicate_host_request_for_trip") 

285 

286 conversation = Conversation() 

287 session.add(conversation) 

288 session.flush() 

289 

290 session.add( 

291 Message( 

292 conversation_id=conversation.id, 

293 author_id=context.user_id, 

294 message_type=MessageType.chat_created, 

295 ) 

296 ) 

297 

298 message = Message( 

299 conversation_id=conversation.id, 

300 author_id=context.user_id, 

301 text=request.text, 

302 message_type=MessageType.text, 

303 ) 

304 session.add(message) 

305 session.flush() 

306 

307 # Create moderation state for UMS (starts as SHADOWED) 

308 moderation_state = create_moderation( 

309 session=session, 

310 object_type=ModerationObjectType.host_request, 

311 object_id=conversation.id, 

312 creator_user_id=context.user_id, 

313 ) 

314 

315 host_request = HostRequest( 

316 conversation_id=conversation.id, 

317 initiator_user_id=context.user_id, 

318 recipient_user_id=recipient.id, 

319 moderation_state_id=moderation_state.id, 

320 from_date=from_date, 

321 to_date=to_date, 

322 status=HostRequestStatus.pending, 

323 initiator_last_seen_message_id=message.id, 

324 # TODO: tz 

325 # timezone=recipient.timezone, 

326 hosting_city=recipient.city, 

327 hosting_location=recipient.geom, 

328 hosting_radius=recipient.geom_radius, 

329 public_trip_id=public_trip_id, 

330 ) 

331 session.add(host_request) 

332 session.flush() 

333 

334 notify( 

335 session, 

336 user_id=host_request.recipient_user_id, 

337 topic_action=NotificationTopicAction.host_request__create, 

338 key=str(host_request.conversation_id), 

339 data=notification_data_pb2.HostRequestCreate( 

340 host_request=host_request_to_pb(host_request, session, context), 

341 surfer=user_model_to_pb(host_request.initiator, session, context), 

342 text=request.text, 

343 ), 

344 moderation_state_id=moderation_state.id, 

345 ) 

346 

347 host_requests_sent_counter.labels(user.gender, recipient.gender).inc() 

348 sent_messages_counter.labels(user.gender, "host request send").inc() 

349 account_age_on_host_request_create_histogram.labels(user.gender, recipient.gender).observe( 

350 (now() - user.joined).total_seconds() 

351 ) 

352 log_event( 

353 context, 

354 session, 

355 "host_request.created", 

356 { 

357 "host_request_id": host_request.conversation_id, 

358 "host_id": recipient.id, 

359 "surfer_gender": user.gender, 

360 "host_gender": recipient.gender, 

361 "city": recipient.city, 

362 "from_date": str(from_date), 

363 "to_date": str(to_date), 

364 "nights": (to_date - from_date).days, 

365 }, 

366 ) 

367 

368 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id) 

369 

370 def GetHostRequest( 

371 self, request: requests_pb2.GetHostRequestReq, context: CouchersContext, session: Session 

372 ) -> requests_pb2.HostRequest: 

373 host_request = session.execute( 

374 where_moderated_content_visible( 

375 where_users_column_visible( 

376 where_users_column_visible( 

377 select(HostRequest), 

378 context, 

379 HostRequest.initiator_user_id, 

380 ), 

381 context, 

382 HostRequest.recipient_user_id, 

383 ), 

384 context, 

385 HostRequest, 

386 is_list_operation=False, 

387 ) 

388 .where(HostRequest.conversation_id == request.host_request_id) 

389 .where( 

390 or_(HostRequest.initiator_user_id == context.user_id, HostRequest.recipient_user_id == context.user_id) 

391 ) 

392 ).scalar_one_or_none() 

393 

394 if not host_request: 

395 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

396 

397 return host_request_to_pb(host_request, session, context) 

398 

399 def ListHostRequests( 

400 self, request: requests_pb2.ListHostRequestsReq, context: CouchersContext, session: Session 

401 ) -> requests_pb2.ListHostRequestsRes: 

402 if request.only_sent and request.only_received: 402 ↛ 403line 402 didn't jump to line 403 because the condition on line 402 was never true

403 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

404 

405 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

406 pagination = min(pagination, MAX_PAGE_SIZE) 

407 

408 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have 

409 # none as message_2.id. So just filter for these to get the highest messages only. 

410 # See https://stackoverflow.com/a/27802817/6115336 

411 message_2 = aliased(Message) 

412 statement = where_moderated_content_visible( 

413 where_users_column_visible( 

414 where_users_column_visible( 

415 select(Message, HostRequest, Conversation) 

416 .outerjoin( 

417 message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id) 

418 ) 

419 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

420 .join(Conversation, Conversation.id == HostRequest.conversation_id), 

421 context, 

422 HostRequest.initiator_user_id, 

423 ), 

424 context, 

425 HostRequest.recipient_user_id, 

426 ), 

427 context, 

428 HostRequest, 

429 is_list_operation=True, 

430 ).where(message_2.id == None) 

431 

432 if request.last_request_id != 0: 

433 statement = statement.where(Message.id < request.last_request_id) 

434 if request.only_sent: 

435 statement = statement.where(HostRequest.initiator_user_id == context.user_id) 

436 elif request.only_received: 

437 statement = statement.where(HostRequest.recipient_user_id == context.user_id) 

438 elif request.HasField("only_archived"): 

439 statement = statement.where( 

440 or_( 

441 and_( 

442 HostRequest.initiator_user_id == context.user_id, 

443 HostRequest.is_initiator_archived == request.only_archived, 

444 ), 

445 and_( 

446 HostRequest.recipient_user_id == context.user_id, 

447 HostRequest.is_recipient_archived == request.only_archived, 

448 ), 

449 ) 

450 ) 

451 else: 

452 statement = statement.where( 

453 or_(HostRequest.recipient_user_id == context.user_id, HostRequest.initiator_user_id == context.user_id) 

454 ) 

455 

456 # TODO: I considered having the latest control message be the single source of truth for 

457 # the HostRequest.status, but decided against it because of this filter. 

458 # Another possibility is to filter in the python instead of SQL, but that's slower 

459 if request.only_active: 

460 statement = statement.where( 

461 or_( 

462 HostRequest.status == HostRequestStatus.pending, 

463 HostRequest.status == HostRequestStatus.accepted, 

464 HostRequest.status == HostRequestStatus.confirmed, 

465 ) 

466 ) 

467 statement = statement.where(HostRequest.end_time <= func.now()) 

468 

469 statement = statement.order_by(Message.id.desc()).limit(pagination + 1) 

470 results = session.execute(statement).all() 

471 

472 host_requests = [] 

473 for result in results[:pagination]: 

474 lat, lng = get_coordinates(result.HostRequest.hosting_location) 

475 host_requests.append( 

476 requests_pb2.HostRequest( 

477 host_request_id=result.HostRequest.conversation_id, 

478 surfer_user_id=result.HostRequest.initiator_user_id, 

479 host_user_id=result.HostRequest.recipient_user_id, 

480 status=hostrequeststatus2api[result.HostRequest.status], 

481 created=Timestamp_from_datetime(result.Conversation.created), 

482 from_date=date_to_api(result.HostRequest.from_date), 

483 to_date=date_to_api(result.HostRequest.to_date), 

484 last_seen_message_id=( 

485 result.HostRequest.initiator_last_seen_message_id 

486 if context.user_id == result.HostRequest.initiator_user_id 

487 else result.HostRequest.recipient_last_seen_message_id 

488 ), 

489 latest_message=message_to_pb(result.Message), 

490 hosting_city=result.HostRequest.hosting_city, 

491 hosting_lat=lat, 

492 hosting_lng=lng, 

493 hosting_radius=result.HostRequest.hosting_radius, 

494 ) 

495 ) 

496 

497 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO 

498 no_more = len(results) <= pagination 

499 

500 return requests_pb2.ListHostRequestsRes( 

501 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests 

502 ) 

503 

504 def RespondHostRequest( 

505 self, request: requests_pb2.RespondHostRequestReq, context: CouchersContext, session: Session 

506 ) -> empty_pb2.Empty: 

507 def count_host_response(other_user_id: int, response_type: str) -> None: 

508 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

509 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one() 

510 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc() 

511 sent_messages_counter.labels(user_gender, "host request response").inc() 

512 

513 host_request = session.execute( 

514 where_moderated_content_visible( 

515 where_users_column_visible( 

516 where_users_column_visible( 

517 select(HostRequest), 

518 context, 

519 HostRequest.initiator_user_id, 

520 ), 

521 context, 

522 HostRequest.recipient_user_id, 

523 ), 

524 context, 

525 HostRequest, 

526 is_list_operation=False, 

527 ).where(HostRequest.conversation_id == request.host_request_id) 

528 ).scalar_one_or_none() 

529 

530 if not host_request: 

531 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

532 

533 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id: 

534 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

535 

536 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING: 

537 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

538 

539 if host_request.end_time < now(): 539 ↛ 540line 539 didn't jump to line 540 because the condition on line 539 was never true

540 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past") 

541 

542 control_message = Message( 

543 message_type=MessageType.host_request_status_changed, 

544 conversation_id=host_request.conversation_id, 

545 author_id=context.user_id, 

546 ) 

547 

548 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED: 

549 # only host can accept 

550 if context.user_id != host_request.recipient_user_id: 

551 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host") 

552 # can't accept a cancelled or confirmed request (only reject), or already accepted 

553 if ( 553 ↛ 558line 553 didn't jump to line 558 because the condition on line 553 was never true

554 host_request.status == HostRequestStatus.cancelled 

555 or host_request.status == HostRequestStatus.confirmed 

556 or host_request.status == HostRequestStatus.accepted 

557 ): 

558 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

559 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted") 

560 control_message.host_request_status_target = HostRequestStatus.accepted 

561 host_request.status = HostRequestStatus.accepted 

562 session.flush() 

563 

564 notify( 

565 session, 

566 user_id=host_request.initiator_user_id, 

567 topic_action=NotificationTopicAction.host_request__accept, 

568 key=str(host_request.conversation_id), 

569 data=notification_data_pb2.HostRequestAccept( 

570 host_request=host_request_to_pb(host_request, session, context), 

571 host=user_model_to_pb(host_request.recipient, session, context), 

572 ), 

573 moderation_state_id=host_request.moderation_state_id, 

574 ) 

575 

576 count_host_response(host_request.initiator_user_id, "accepted") 

577 log_event( 

578 context, 

579 session, 

580 "host_request.accepted", 

581 { 

582 "host_request_id": host_request.conversation_id, 

583 "surfer_id": host_request.initiator_user_id, 

584 "host_id": host_request.recipient_user_id, 

585 "surfer_gender": host_request.initiator.gender, 

586 "host_gender": host_request.recipient.gender, 

587 "from_date": str(host_request.from_date), 

588 "to_date": str(host_request.to_date), 

589 "host_city": host_request.hosting_city, 

590 }, 

591 ) 

592 

593 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED: 

594 # only host can reject 

595 if context.user_id != host_request.recipient_user_id: 595 ↛ 596line 595 didn't jump to line 596 because the condition on line 595 was never true

596 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

597 # can't reject a cancelled or already rejected request 

598 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 598 ↛ 599line 598 didn't jump to line 599 because the condition on line 598 was never true

599 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

600 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected") 

601 control_message.host_request_status_target = HostRequestStatus.rejected 

602 host_request.status = HostRequestStatus.rejected 

603 session.flush() 

604 

605 notify( 

606 session, 

607 user_id=host_request.initiator_user_id, 

608 topic_action=NotificationTopicAction.host_request__reject, 

609 key=str(host_request.conversation_id), 

610 data=notification_data_pb2.HostRequestReject( 

611 host_request=host_request_to_pb(host_request, session, context), 

612 host=user_model_to_pb(host_request.recipient, session, context), 

613 ), 

614 moderation_state_id=host_request.moderation_state_id, 

615 ) 

616 

617 count_host_response(host_request.initiator_user_id, "rejected") 

618 

619 log_event( 

620 context, 

621 session, 

622 "host_request.rejected", 

623 { 

624 "host_request_id": host_request.conversation_id, 

625 "surfer_id": host_request.initiator_user_id, 

626 "host_id": host_request.recipient_user_id, 

627 "surfer_gender": host_request.initiator.gender, 

628 "host_gender": host_request.recipient.gender, 

629 "from_date": str(host_request.from_date), 

630 "to_date": str(host_request.to_date), 

631 "host_city": host_request.hosting_city, 

632 }, 

633 ) 

634 

635 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED: 

636 # only surfer can confirm 

637 if context.user_id != host_request.initiator_user_id: 

638 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

639 # can only confirm an accepted request 

640 if host_request.status != HostRequestStatus.accepted: 

641 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

642 control_message.host_request_status_target = HostRequestStatus.confirmed 

643 host_request.status = HostRequestStatus.confirmed 

644 session.flush() 

645 

646 notify( 

647 session, 

648 user_id=host_request.recipient_user_id, 

649 topic_action=NotificationTopicAction.host_request__confirm, 

650 key=str(host_request.conversation_id), 

651 data=notification_data_pb2.HostRequestConfirm( 

652 host_request=host_request_to_pb(host_request, session, context), 

653 surfer=user_model_to_pb(host_request.initiator, session, context), 

654 ), 

655 moderation_state_id=host_request.moderation_state_id, 

656 ) 

657 

658 count_host_response(host_request.recipient_user_id, "confirmed") 

659 log_event( 

660 context, 

661 session, 

662 "host_request.confirmed", 

663 { 

664 "host_request_id": host_request.conversation_id, 

665 "surfer_id": host_request.initiator_user_id, 

666 "host_id": host_request.recipient_user_id, 

667 "surfer_gender": host_request.initiator.gender, 

668 "host_gender": host_request.recipient.gender, 

669 "from_date": str(host_request.from_date), 

670 "to_date": str(host_request.to_date), 

671 "host_city": host_request.hosting_city, 

672 }, 

673 ) 

674 

675 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED: 

676 # only surfer can cancel 

677 if context.user_id != host_request.initiator_user_id: 

678 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

679 # can't' cancel an already cancelled or rejected request 

680 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 680 ↛ 681line 680 didn't jump to line 681 because the condition on line 680 was never true

681 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

682 control_message.host_request_status_target = HostRequestStatus.cancelled 

683 host_request.status = HostRequestStatus.cancelled 

684 session.flush() 

685 

686 notify( 

687 session, 

688 user_id=host_request.recipient_user_id, 

689 topic_action=NotificationTopicAction.host_request__cancel, 

690 key=str(host_request.conversation_id), 

691 data=notification_data_pb2.HostRequestCancel( 

692 host_request=host_request_to_pb(host_request, session, context), 

693 surfer=user_model_to_pb(host_request.initiator, session, context), 

694 ), 

695 moderation_state_id=host_request.moderation_state_id, 

696 ) 

697 

698 count_host_response(host_request.recipient_user_id, "cancelled") 

699 log_event( 

700 context, 

701 session, 

702 "host_request.cancelled", 

703 { 

704 "host_request_id": host_request.conversation_id, 

705 "surfer_id": host_request.initiator_user_id, 

706 "host_id": host_request.recipient_user_id, 

707 "surfer_gender": host_request.initiator.gender, 

708 "host_gender": host_request.recipient.gender, 

709 "from_date": str(host_request.from_date), 

710 "to_date": str(host_request.to_date), 

711 "host_city": host_request.hosting_city, 

712 }, 

713 ) 

714 

715 session.add(control_message) 

716 

717 if request.text: 

718 latest_message = Message( 

719 conversation_id=host_request.conversation_id, 

720 text=request.text, 

721 author_id=context.user_id, 

722 message_type=MessageType.text, 

723 ) 

724 

725 session.add(latest_message) 

726 else: 

727 latest_message = control_message 

728 

729 session.flush() 

730 

731 if host_request.initiator_user_id == context.user_id: 

732 host_request.initiator_last_seen_message_id = latest_message.id 

733 else: 

734 host_request.recipient_last_seen_message_id = latest_message.id 

735 session.commit() 

736 

737 return empty_pb2.Empty() 

738 

739 def GetHostRequestMessages( 

740 self, request: requests_pb2.GetHostRequestMessagesReq, context: CouchersContext, session: Session 

741 ) -> requests_pb2.GetHostRequestMessagesRes: 

742 host_request = session.execute( 

743 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

744 HostRequest.conversation_id == request.host_request_id 

745 ) 

746 ).scalar_one_or_none() 

747 

748 if not host_request: 748 ↛ 749line 748 didn't jump to line 749 because the condition on line 748 was never true

749 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

750 

751 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id: 751 ↛ 752line 751 didn't jump to line 752 because the condition on line 751 was never true

752 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

753 

754 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

755 pagination = min(pagination, MAX_PAGE_SIZE) 

756 

757 messages = ( 

758 session.execute( 

759 select(Message) 

760 .where(Message.conversation_id == host_request.conversation_id) 

761 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0))) 

762 .order_by(Message.id.desc()) 

763 .limit(pagination + 1) 

764 ) 

765 .scalars() 

766 .all() 

767 ) 

768 

769 no_more = len(messages) <= pagination 

770 

771 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0 

772 

773 return requests_pb2.GetHostRequestMessagesRes( 

774 last_message_id=last_message_id, 

775 no_more=no_more, 

776 messages=[message_to_pb(message) for message in messages[:pagination]], 

777 ) 

778 

779 def SendHostRequestMessage( 

780 self, request: requests_pb2.SendHostRequestMessageReq, context: CouchersContext, session: Session 

781 ) -> empty_pb2.Empty: 

782 if request.text == "": 

783 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

784 host_request = session.execute( 

785 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

786 HostRequest.conversation_id == request.host_request_id 

787 ) 

788 ).scalar_one_or_none() 

789 

790 if not host_request: 

791 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

792 

793 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id: 

794 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

795 

796 if host_request.recipient_user_id == context.user_id: 

797 _possibly_observe_first_response_time(session, host_request, context.user_id, "message") 

798 

799 message = Message( 

800 conversation_id=host_request.conversation_id, 

801 author_id=context.user_id, 

802 message_type=MessageType.text, 

803 text=request.text, 

804 ) 

805 

806 session.add(message) 

807 session.flush() 

808 

809 if host_request.initiator_user_id == context.user_id: 

810 host_request.initiator_last_seen_message_id = message.id 

811 

812 notify( 

813 session, 

814 user_id=host_request.recipient_user_id, 

815 topic_action=NotificationTopicAction.host_request__message, 

816 key=str(host_request.conversation_id), 

817 data=notification_data_pb2.HostRequestMessage( 

818 host_request=host_request_to_pb(host_request, session, context), 

819 user=user_model_to_pb(host_request.initiator, session, context), 

820 text=request.text, 

821 am_host=True, 

822 ), 

823 moderation_state_id=host_request.moderation_state_id, 

824 ) 

825 

826 else: 

827 host_request.recipient_last_seen_message_id = message.id 

828 

829 notify( 

830 session, 

831 user_id=host_request.initiator_user_id, 

832 topic_action=NotificationTopicAction.host_request__message, 

833 key=str(host_request.conversation_id), 

834 data=notification_data_pb2.HostRequestMessage( 

835 host_request=host_request_to_pb(host_request, session, context), 

836 user=user_model_to_pb(host_request.recipient, session, context), 

837 text=request.text, 

838 am_host=False, 

839 ), 

840 moderation_state_id=host_request.moderation_state_id, 

841 ) 

842 

843 session.commit() 

844 

845 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

846 sent_messages_counter.labels(user_gender, "host request").inc() 

847 log_event( 

848 context, 

849 session, 

850 "host_request.message_sent", 

851 { 

852 "host_request_id": host_request.conversation_id, 

853 "surfer_id": host_request.initiator_user_id, 

854 "host_id": host_request.recipient_user_id, 

855 "role": "host" if context.user_id == host_request.recipient_user_id else "surfer", 

856 "host_city": host_request.hosting_city, 

857 }, 

858 ) 

859 

860 return empty_pb2.Empty() 

861 

862 def GetHostRequestUpdates( 

863 self, request: requests_pb2.GetHostRequestUpdatesReq, context: CouchersContext, session: Session 

864 ) -> requests_pb2.GetHostRequestUpdatesRes: 

865 if request.only_sent and request.only_received: 865 ↛ 866line 865 didn't jump to line 866 because the condition on line 865 was never true

866 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

867 

868 if request.newest_message_id == 0: 

869 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

870 

871 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 871 ↛ 872line 871 didn't jump to line 872 because the condition on line 871 was never true

872 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

873 

874 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

875 pagination = min(pagination, MAX_PAGE_SIZE) 

876 

877 statement = where_moderated_content_visible( 

878 select( 

879 Message, 

880 HostRequest.status.label("host_request_status"), 

881 HostRequest.conversation_id.label("host_request_id"), 

882 ) 

883 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

884 .where(Message.id > request.newest_message_id), 

885 context, 

886 HostRequest, 

887 is_list_operation=False, 

888 ) 

889 

890 if request.only_sent: 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true

891 statement = statement.where(HostRequest.initiator_user_id == context.user_id) 

892 elif request.only_received: 892 ↛ 893line 892 didn't jump to line 893 because the condition on line 892 was never true

893 statement = statement.where(HostRequest.recipient_user_id == context.user_id) 

894 else: 

895 statement = statement.where( 

896 or_(HostRequest.recipient_user_id == context.user_id, HostRequest.initiator_user_id == context.user_id) 

897 ) 

898 

899 statement = statement.order_by(Message.id.asc()).limit(pagination + 1) 

900 res = session.execute(statement).all() 

901 

902 no_more = len(res) <= pagination 

903 

904 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO 

905 

906 return requests_pb2.GetHostRequestUpdatesRes( 

907 no_more=no_more, 

908 updates=[ 

909 requests_pb2.HostRequestUpdate( 

910 host_request_id=result.host_request_id, 

911 status=hostrequeststatus2api[result.host_request_status], 

912 message=message_to_pb(result.Message), 

913 ) 

914 for result in res[:pagination] 

915 ], 

916 ) 

917 

918 def MarkLastSeenHostRequest( 

919 self, request: requests_pb2.MarkLastSeenHostRequestReq, context: CouchersContext, session: Session 

920 ) -> empty_pb2.Empty: 

921 host_request = session.execute( 

922 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

923 HostRequest.conversation_id == request.host_request_id 

924 ) 

925 ).scalar_one_or_none() 

926 

927 if not host_request: 927 ↛ 928line 927 didn't jump to line 928 because the condition on line 927 was never true

928 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

929 

930 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id: 930 ↛ 931line 930 didn't jump to line 931 because the condition on line 930 was never true

931 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

932 

933 if host_request.initiator_user_id == context.user_id: 933 ↛ 934line 933 didn't jump to line 934 because the condition on line 933 was never true

934 if not host_request.initiator_last_seen_message_id <= request.last_seen_message_id: 

935 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

936 host_request.initiator_last_seen_message_id = request.last_seen_message_id 

937 else: 

938 if not host_request.recipient_last_seen_message_id <= request.last_seen_message_id: 

939 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

940 host_request.recipient_last_seen_message_id = request.last_seen_message_id 

941 

942 session.commit() 

943 return empty_pb2.Empty() 

944 

945 def SetHostRequestArchiveStatus( 

946 self, request: requests_pb2.SetHostRequestArchiveStatusReq, context: CouchersContext, session: Session 

947 ) -> requests_pb2.SetHostRequestArchiveStatusRes: 

948 host_request = session.execute( 

949 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False) 

950 .where(HostRequest.conversation_id == request.host_request_id) 

951 .where( 

952 or_(HostRequest.initiator_user_id == context.user_id, HostRequest.recipient_user_id == context.user_id) 

953 ) 

954 ).scalar_one_or_none() 

955 

956 if not host_request: 956 ↛ 957line 956 didn't jump to line 957 because the condition on line 956 was never true

957 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

958 

959 if context.user_id == host_request.initiator_user_id: 959 ↛ 962line 959 didn't jump to line 962 because the condition on line 959 was always true

960 host_request.is_initiator_archived = request.is_archived 

961 else: 

962 host_request.is_recipient_archived = request.is_archived 

963 

964 return requests_pb2.SetHostRequestArchiveStatusRes( 

965 host_request_id=host_request.conversation_id, 

966 is_archived=request.is_archived, 

967 ) 

968 

969 def GetResponseRate( 

970 self, request: requests_pb2.GetResponseRateReq, context: CouchersContext, session: Session 

971 ) -> requests_pb2.GetResponseRateRes: 

972 user_res = session.execute( 

973 select(User.id, UserResponseRate) 

974 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id) 

975 .where(users_visible(context, User)) 

976 .where(User.id == request.user_id) 

977 ).one_or_none() 

978 

979 # if user doesn't exist, return None 

980 if not user_res: 

981 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

982 

983 user, response_rates = user_res 

984 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates)) # type: ignore[arg-type] 

985 

986 def SendHostRequestFeedback( 

987 self, request: requests_pb2.SendHostRequestFeedbackReq, context: CouchersContext, session: Session 

988 ) -> empty_pb2.Empty: 

989 host_request = session.execute( 

990 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False) 

991 .where(HostRequest.conversation_id == request.host_request_id) 

992 .where(HostRequest.recipient_user_id == context.user_id) 

993 ).scalar_one_or_none() 

994 

995 if not host_request: 

996 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

997 

998 feedback = session.execute( 

999 select(HostRequestFeedback) 

1000 .where(HostRequestFeedback.host_request_id == host_request.conversation_id) 

1001 .where(HostRequestFeedback.from_user_id == context.user_id) 

1002 ).scalar_one_or_none() 

1003 

1004 if feedback: 

1005 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback") 

1006 

1007 session.add( 

1008 HostRequestFeedback( 

1009 host_request_id=host_request.conversation_id, 

1010 from_user_id=host_request.recipient_user_id, 

1011 to_user_id=host_request.initiator_user_id, 

1012 request_quality=hostrequestquality2sql.get(request.host_request_quality), 

1013 decline_reason=request.decline_reason, 

1014 ) 

1015 ) 

1016 quality = hostrequestquality2sql.get(request.host_request_quality) 

1017 log_event( 

1018 context, 

1019 session, 

1020 "host_request.feedback_submitted", 

1021 { 

1022 "host_request_id": host_request.conversation_id, 

1023 "surfer_id": host_request.initiator_user_id, 

1024 "host_id": host_request.recipient_user_id, 

1025 "request_quality": quality.name if quality else None, 

1026 "has_decline_reason": bool(request.decline_reason), 

1027 "host_city": host_request.hosting_city, 

1028 }, 

1029 ) 

1030 

1031 return empty_pb2.Empty()