Coverage for app/backend/src/couchers/servicers/requests.py: 92%

339 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import exists, select 

7from sqlalchemy.orm import Session, aliased 

8from sqlalchemy.sql import and_, func, or_ 

9 

10from couchers.constants import HOST_REQUEST_MIN_LENGTH_UTF16 

11from couchers.context import CouchersContext, make_notification_user_context 

12from couchers.db import can_moderate_node 

13from couchers.event_log import log_event 

14from couchers.helpers.completed_profile import has_completed_profile 

15from couchers.materialized_views import UserResponseRate 

16from couchers.metrics import ( 

17 account_age_on_host_request_create_histogram, 

18 host_request_first_response_histogram, 

19 host_request_responses_counter, 

20 host_requests_sent_counter, 

21 sent_messages_counter, 

22) 

23from couchers.models import ( 

24 Conversation, 

25 HostRequest, 

26 HostRequestFeedback, 

27 HostRequestQuality, 

28 HostRequestStatus, 

29 Message, 

30 MessageType, 

31 ModerationObjectType, 

32 RateLimitAction, 

33 User, 

34) 

35from couchers.models.notifications import NotificationTopicAction 

36from couchers.models.public_trips import PublicTrip, PublicTripStatus 

37from couchers.moderation.utils import create_moderation 

38from couchers.notifications.notify import mark_notifications_seen, notify 

39from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc 

40from couchers.rate_limits.check import process_rate_limits_and_check_abort 

41from couchers.rate_limits.definitions import RATE_LIMIT_HOURS 

42from couchers.servicers.api import response_rate_to_pb, user_model_to_pb 

43from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible 

44from couchers.utils import ( 

45 Timestamp_from_datetime, 

46 date_to_api, 

47 get_coordinates, 

48 now, 

49 parse_date, 

50 today_in_timezone, 

51) 

52 

53logger = logging.getLogger(__name__) 

54 

55DEFAULT_PAGINATION_LENGTH = 10 

56MAX_PAGE_SIZE = 50 

57 

58 

59hostrequeststatus2api = { 

60 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING, 

61 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

62 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED, 

63 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED, 

64 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED, 

65} 

66 

67api2hostrequeststatus = { 

68 conversations_pb2.HOST_REQUEST_STATUS_PENDING: HostRequestStatus.pending, 

69 conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED: HostRequestStatus.accepted, 

70 conversations_pb2.HOST_REQUEST_STATUS_REJECTED: HostRequestStatus.rejected, 

71 conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED: HostRequestStatus.confirmed, 

72 conversations_pb2.HOST_REQUEST_STATUS_CANCELLED: HostRequestStatus.cancelled, 

73} 

74 

75hostrequestquality2sql = { 

76 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality, 

77 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality, 

78 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality, 

79} 

80 

81 

82def message_to_pb(message: Message) -> conversations_pb2.Message: 

83 """ 

84 Turns the given message to a protocol buffer 

85 """ 

86 if message.is_normal_message: 

87 return conversations_pb2.Message( 

88 message_id=message.id, 

89 author_user_id=message.author_id, 

90 time=Timestamp_from_datetime(message.time), 

91 text=conversations_pb2.MessageContentText(text=message.text), 

92 ) 

93 else: 

94 return conversations_pb2.Message( 

95 message_id=message.id, 

96 author_user_id=message.author_id, 

97 time=Timestamp_from_datetime(message.time), 

98 chat_created=( 

99 conversations_pb2.MessageContentChatCreated() 

100 if message.message_type == MessageType.chat_created 

101 else None 

102 ), 

103 host_request_status_changed=( 

104 conversations_pb2.MessageContentHostRequestStatusChanged( 

105 status=hostrequeststatus2api[message.host_request_status_target] # type: ignore[index] 

106 ) 

107 if message.message_type == MessageType.host_request_status_changed 

108 else None 

109 ), 

110 ) 

111 

112 

113def host_request_to_pb( 

114 host_request: HostRequest, session: Session, context: CouchersContext 

115) -> requests_pb2.HostRequest: 

116 initial_message = session.execute( 

117 select(Message) 

118 .where(Message.conversation_id == host_request.conversation_id) 

119 .order_by(Message.id.asc()) 

120 .limit(1) 

121 ).scalar_one() 

122 

123 latest_message = session.execute( 

124 select(Message) 

125 .where(Message.conversation_id == host_request.conversation_id) 

126 .order_by(Message.id.desc()) 

127 .limit(1) 

128 ).scalar_one() 

129 

130 lat, lng = get_coordinates(host_request.hosting_location) 

131 

132 need_feedback = False 

133 if context.user_id == host_request.recipient_user_id and host_request.status == HostRequestStatus.rejected: 

134 need_feedback = not session.execute( 

135 select( 

136 exists().where( 

137 HostRequestFeedback.from_user_id == context.user_id, 

138 HostRequestFeedback.host_request_id == host_request.conversation_id, 

139 ) 

140 ) 

141 ).scalar_one() 

142 

143 return requests_pb2.HostRequest( 

144 host_request_id=host_request.conversation_id, 

145 surfer_user_id=host_request.initiator_user_id, 

146 host_user_id=host_request.recipient_user_id, 

147 status=hostrequeststatus2api[host_request.status], 

148 created=Timestamp_from_datetime(initial_message.time), 

149 from_date=date_to_api(host_request.from_date), 

150 to_date=date_to_api(host_request.to_date), 

151 last_seen_message_id=( 

152 host_request.initiator_last_seen_message_id 

153 if context.user_id == host_request.initiator_user_id 

154 else host_request.recipient_last_seen_message_id 

155 ), 

156 latest_message=message_to_pb(latest_message), 

157 hosting_city=host_request.hosting_city, 

158 hosting_lat=lat, 

159 hosting_lng=lng, 

160 hosting_radius=host_request.hosting_radius, 

161 need_host_request_feedback=need_feedback, 

162 is_archived=( 

163 host_request.is_recipient_archived 

164 if context.user_id == host_request.recipient_user_id 

165 else host_request.is_initiator_archived 

166 ), 

167 public_trip_id=host_request.public_trip_id, 

168 ) 

169 

170 

171def _possibly_observe_first_response_time( 

172 session: Session, host_request: HostRequest, user_id: int, response_type: str 

173) -> None: 

174 # if this is the first response then there's nothing by this user yet 

175 assert host_request.recipient_user_id == user_id 

176 

177 number_messages_by_host = session.execute( 

178 select(func.count()) 

179 .where(Message.conversation_id == host_request.conversation_id) 

180 .where(Message.author_id == user_id) 

181 ).scalar_one_or_none() 

182 

183 if number_messages_by_host == 0: 

184 host_gender = session.execute(select(User.gender).where(User.id == host_request.recipient_user_id)).scalar_one() 

185 surfer_gender = session.execute( 

186 select(User.gender).where(User.id == host_request.initiator_user_id) 

187 ).scalar_one() 

188 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe( 

189 (now() - host_request.conversation.created).total_seconds() 

190 ) 

191 

192 

193def _is_host_request_long_enough(text: str) -> bool: 

194 # Python's len(str) does not match Javascript's string.length. 

195 # e.g. len("é") == 2 but "é".length == 1. 

196 # To match the frontend's validation, measure the string in utf16 code units. 

197 text_length_utf16 = len(text.encode("utf-16-le")) // 2 # utf-16-le does not include a prefix BOM code unit. 

198 return text_length_utf16 >= HOST_REQUEST_MIN_LENGTH_UTF16 

199 

200 

201class Requests(requests_pb2_grpc.RequestsServicer): 

202 def CreateHostRequest( 

203 self, request: requests_pb2.CreateHostRequestReq, context: CouchersContext, session: Session 

204 ) -> requests_pb2.CreateHostRequestRes: 

205 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

206 if not has_completed_profile(session, user): 

207 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request") 

208 

209 if request.host_user_id == context.user_id: 

210 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self") 

211 

212 # just to check recipient exists and is visible 

213 recipient = session.execute( 

214 select(User).where(users_visible(context, User)).where(User.id == request.host_user_id) 

215 ).scalar_one_or_none() 

216 if not recipient: 

217 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

218 

219 from_date = parse_date(request.from_date) 

220 to_date = parse_date(request.to_date) 

221 

222 if not from_date or not to_date: 

223 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date") 

224 

225 today = today_in_timezone(recipient.timezone) 

226 

227 # request starts from the past 

228 if from_date < today: 

229 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today") 

230 

231 # from_date is not >= to_date 

232 if from_date >= to_date: 

233 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to") 

234 

235 # No need to check today > to_date 

236 

237 if from_date - today > timedelta(days=365): 

238 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year") 

239 

240 if to_date - from_date > timedelta(days=365): 

241 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year") 

242 

243 # Check minimum length 

244 if not _is_host_request_long_enough(request.text): 

245 context.abort_with_error_code( 

246 grpc.StatusCode.INVALID_ARGUMENT, 

247 "host_request_too_short2", 

248 substitutions={"count": HOST_REQUEST_MIN_LENGTH_UTF16}, 

249 ) 

250 

251 # Check if user has been sending host requests excessively 

252 if process_rate_limits_and_check_abort( 

253 session=session, user_id=context.user_id, action=RateLimitAction.host_request 

254 ): 

255 context.abort_with_error_code( 

256 grpc.StatusCode.RESOURCE_EXHAUSTED, 

257 "host_request_rate_limit2", 

258 substitutions={"count": RATE_LIMIT_HOURS}, 

259 ) 

260 

261 # If this is an offer in response to a public trip, validate it 

262 public_trip_id = request.public_trip_id if request.HasField("public_trip_id") else None 

263 if public_trip_id is not None: 

264 public_trip = session.execute( 

265 select(PublicTrip).where(PublicTrip.id == public_trip_id) 

266 ).scalar_one_or_none() 

267 if not public_trip: 

268 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "public_trip_not_found") 

269 # The trip's traveler must be the recipient of this host request (role reversal) 

270 if public_trip.user_id != recipient.id: 

271 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "public_trip_user_mismatch") 

272 # Trip must still be active 

273 if public_trip.status != PublicTripStatus.searching_for_host: 

274 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "public_trip_not_active") 

275 # Offered dates must fall within the trip's window (host can shorten, not extend) 

276 if from_date < public_trip.from_date or to_date > public_trip.to_date: 

277 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "public_trip_dates_out_of_range") 

278 # Enforce same_gender_only restriction (community moderators bypass) 

279 if ( 

280 public_trip.same_gender_only 

281 and not can_moderate_node(session, context.user_id, public_trip.node_id) 

282 and user.gender != recipient.gender 

283 ): 

284 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "public_trip_same_gender_only") 

285 # Prevent duplicate offers on the same trip 

286 existing_offer = session.execute( 

287 select(HostRequest) 

288 .where(HostRequest.public_trip_id == public_trip_id) 

289 .where(HostRequest.initiator_user_id == context.user_id) 

290 ).scalar_one_or_none() 

291 if existing_offer: 

292 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "duplicate_host_request_for_trip") 

293 

294 conversation = Conversation() 

295 session.add(conversation) 

296 session.flush() 

297 

298 session.add( 

299 Message( 

300 conversation_id=conversation.id, 

301 author_id=context.user_id, 

302 message_type=MessageType.chat_created, 

303 ) 

304 ) 

305 

306 message = Message( 

307 conversation_id=conversation.id, 

308 author_id=context.user_id, 

309 text=request.text, 

310 message_type=MessageType.text, 

311 ) 

312 session.add(message) 

313 session.flush() 

314 

315 # Create moderation state for UMS (starts as SHADOWED) 

316 moderation_state = create_moderation( 

317 session=session, 

318 object_type=ModerationObjectType.host_request, 

319 object_id=conversation.id, 

320 creator_user_id=context.user_id, 

321 ) 

322 

323 host_request = HostRequest( 

324 conversation_id=conversation.id, 

325 initiator_user_id=context.user_id, 

326 recipient_user_id=recipient.id, 

327 moderation_state_id=moderation_state.id, 

328 from_date=from_date, 

329 to_date=to_date, 

330 status=HostRequestStatus.pending, 

331 initiator_last_seen_message_id=message.id, 

332 # TODO: tz 

333 # timezone=recipient.timezone, 

334 hosting_city=recipient.city, 

335 hosting_location=recipient.geom, 

336 hosting_radius=recipient.geom_radius, 

337 public_trip_id=public_trip_id, 

338 ) 

339 session.add(host_request) 

340 session.flush() 

341 

342 recipient_context = make_notification_user_context(user_id=host_request.recipient_user_id) 

343 notify( 

344 session, 

345 user_id=host_request.recipient_user_id, 

346 topic_action=NotificationTopicAction.host_request__create, 

347 key=str(host_request.conversation_id), 

348 data=notification_data_pb2.HostRequestCreate( 

349 host_request=host_request_to_pb(host_request, session, recipient_context), 

350 surfer=user_model_to_pb(host_request.initiator, session, recipient_context), 

351 text=request.text, 

352 ), 

353 moderation_state_id=moderation_state.id, 

354 ) 

355 

356 host_requests_sent_counter.labels(user.gender, recipient.gender).inc() 

357 sent_messages_counter.labels(user.gender, "host request send").inc() 

358 account_age_on_host_request_create_histogram.labels(user.gender, recipient.gender).observe( 

359 (now() - user.joined).total_seconds() 

360 ) 

361 log_event( 

362 context, 

363 session, 

364 "host_request.created", 

365 { 

366 "host_request_id": host_request.conversation_id, 

367 "host_id": recipient.id, 

368 "surfer_gender": user.gender, 

369 "host_gender": recipient.gender, 

370 "city": recipient.city, 

371 "from_date": str(from_date), 

372 "to_date": str(to_date), 

373 "nights": (to_date - from_date).days, 

374 }, 

375 ) 

376 

377 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id) 

378 

379 def GetHostRequest( 

380 self, request: requests_pb2.GetHostRequestReq, context: CouchersContext, session: Session 

381 ) -> requests_pb2.HostRequest: 

382 host_request = session.execute( 

383 where_moderated_content_visible( 

384 where_users_column_visible( 

385 where_users_column_visible( 

386 select(HostRequest), 

387 context, 

388 HostRequest.initiator_user_id, 

389 ), 

390 context, 

391 HostRequest.recipient_user_id, 

392 ), 

393 context, 

394 HostRequest, 

395 is_list_operation=False, 

396 ) 

397 .where(HostRequest.conversation_id == request.host_request_id) 

398 .where( 

399 or_(HostRequest.initiator_user_id == context.user_id, HostRequest.recipient_user_id == context.user_id) 

400 ) 

401 ).scalar_one_or_none() 

402 

403 if not host_request: 

404 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

405 

406 return host_request_to_pb(host_request, session, context) 

407 

408 def ListHostRequests( 

409 self, request: requests_pb2.ListHostRequestsReq, context: CouchersContext, session: Session 

410 ) -> requests_pb2.ListHostRequestsRes: 

411 if request.only_sent and request.only_received: 411 ↛ 412line 411 didn't jump to line 412 because the condition on line 411 was never true

412 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

413 

414 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

415 pagination = min(pagination, MAX_PAGE_SIZE) 

416 

417 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have 

418 # none as message_2.id. So just filter for these to get the highest messages only. 

419 # See https://stackoverflow.com/a/27802817/6115336 

420 message_2 = aliased(Message) 

421 statement = where_moderated_content_visible( 

422 where_users_column_visible( 

423 where_users_column_visible( 

424 select(Message, HostRequest, Conversation) 

425 .outerjoin( 

426 message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id) 

427 ) 

428 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

429 .join(Conversation, Conversation.id == HostRequest.conversation_id), 

430 context, 

431 HostRequest.initiator_user_id, 

432 ), 

433 context, 

434 HostRequest.recipient_user_id, 

435 ), 

436 context, 

437 HostRequest, 

438 is_list_operation=True, 

439 ).where(message_2.id == None) 

440 

441 sort_by_from_date = request.sort_by == requests_pb2.HOST_REQUEST_SORT_BY_FROM_DATE 

442 

443 if sort_by_from_date: 

444 if request.page_token: 

445 token_date_str, token_conv_id_str = request.page_token.split(":") 

446 token_date = parse_date(token_date_str) 

447 token_conv_id = int(token_conv_id_str) 

448 statement = statement.where( 

449 or_( 

450 HostRequest.from_date > token_date, 

451 and_( 

452 HostRequest.from_date == token_date, 

453 HostRequest.conversation_id > token_conv_id, 

454 ), 

455 ) 

456 ) 

457 else: 

458 if request.page_token: 

459 statement = statement.where(Message.id < int(request.page_token)) 

460 

461 if request.only_sent: 

462 statement = statement.where(HostRequest.initiator_user_id == context.user_id) 

463 elif request.only_received: 

464 statement = statement.where(HostRequest.recipient_user_id == context.user_id) 

465 elif request.HasField("only_archived"): 

466 statement = statement.where( 

467 or_( 

468 and_( 

469 HostRequest.initiator_user_id == context.user_id, 

470 HostRequest.is_initiator_archived == request.only_archived, 

471 ), 

472 and_( 

473 HostRequest.recipient_user_id == context.user_id, 

474 HostRequest.is_recipient_archived == request.only_archived, 

475 ), 

476 ) 

477 ) 

478 else: 

479 statement = statement.where( 

480 or_(HostRequest.recipient_user_id == context.user_id, HostRequest.initiator_user_id == context.user_id) 

481 ) 

482 

483 # TODO: I considered having the latest control message be the single source of truth for 

484 # the HostRequest.status, but decided against it because of this filter. 

485 # Another possibility is to filter in the python instead of SQL, but that's slower 

486 if request.only_active: 

487 statement = statement.where( 

488 or_( 

489 HostRequest.status == HostRequestStatus.pending, 

490 HostRequest.status == HostRequestStatus.accepted, 

491 HostRequest.status == HostRequestStatus.confirmed, 

492 ) 

493 ) 

494 statement = statement.where(HostRequest.end_time >= func.now()) 

495 

496 if request.status_in: 

497 statement = statement.where(HostRequest.status.in_([api2hostrequeststatus[s] for s in request.status_in])) 

498 

499 if sort_by_from_date: 

500 statement = statement.order_by(HostRequest.from_date.asc(), HostRequest.conversation_id.asc()) 

501 else: 

502 statement = statement.order_by(Message.id.desc()) 

503 statement = statement.limit(pagination + 1) 

504 results = session.execute(statement).all() 

505 

506 host_requests = [] 

507 for result in results[:pagination]: 

508 lat, lng = get_coordinates(result.HostRequest.hosting_location) 

509 host_requests.append( 

510 requests_pb2.HostRequest( 

511 host_request_id=result.HostRequest.conversation_id, 

512 surfer_user_id=result.HostRequest.initiator_user_id, 

513 host_user_id=result.HostRequest.recipient_user_id, 

514 status=hostrequeststatus2api[result.HostRequest.status], 

515 created=Timestamp_from_datetime(result.Conversation.created), 

516 from_date=date_to_api(result.HostRequest.from_date), 

517 to_date=date_to_api(result.HostRequest.to_date), 

518 last_seen_message_id=( 

519 result.HostRequest.initiator_last_seen_message_id 

520 if context.user_id == result.HostRequest.initiator_user_id 

521 else result.HostRequest.recipient_last_seen_message_id 

522 ), 

523 latest_message=message_to_pb(result.Message), 

524 hosting_city=result.HostRequest.hosting_city, 

525 hosting_lat=lat, 

526 hosting_lng=lng, 

527 hosting_radius=result.HostRequest.hosting_radius, 

528 ) 

529 ) 

530 

531 no_more = len(results) <= pagination 

532 

533 if len(results) > pagination: 

534 if sort_by_from_date: 

535 last = results[pagination - 1] 

536 next_page_token = f"{date_to_api(last.HostRequest.from_date)}:{last.HostRequest.conversation_id}" 

537 else: 

538 next_page_token = str(min(g.Message.id for g in results[:pagination])) 

539 else: 

540 next_page_token = None 

541 

542 return requests_pb2.ListHostRequestsRes( 

543 next_page_token=next_page_token, no_more=no_more, host_requests=host_requests 

544 ) 

545 

546 def RespondHostRequest( 

547 self, request: requests_pb2.RespondHostRequestReq, context: CouchersContext, session: Session 

548 ) -> empty_pb2.Empty: 

549 def count_host_response(other_user_id: int, response_type: str) -> None: 

550 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

551 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one() 

552 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc() 

553 sent_messages_counter.labels(user_gender, "host request response").inc() 

554 

555 host_request = session.execute( 

556 where_moderated_content_visible( 

557 where_users_column_visible( 

558 where_users_column_visible( 

559 select(HostRequest), 

560 context, 

561 HostRequest.initiator_user_id, 

562 ), 

563 context, 

564 HostRequest.recipient_user_id, 

565 ), 

566 context, 

567 HostRequest, 

568 is_list_operation=False, 

569 ).where(HostRequest.conversation_id == request.host_request_id) 

570 ).scalar_one_or_none() 

571 

572 if not host_request: 

573 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

574 

575 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id: 

576 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

577 

578 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING: 

579 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

580 

581 if host_request.end_time < now(): 581 ↛ 582line 581 didn't jump to line 582 because the condition on line 581 was never true

582 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past") 

583 

584 control_message = Message( 

585 message_type=MessageType.host_request_status_changed, 

586 conversation_id=host_request.conversation_id, 

587 author_id=context.user_id, 

588 ) 

589 

590 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED: 

591 # only host can accept 

592 if context.user_id != host_request.recipient_user_id: 

593 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host") 

594 # can't accept a cancelled or confirmed request (only reject), or already accepted 

595 if ( 595 ↛ 600line 595 didn't jump to line 600 because the condition on line 595 was never true

596 host_request.status == HostRequestStatus.cancelled 

597 or host_request.status == HostRequestStatus.confirmed 

598 or host_request.status == HostRequestStatus.accepted 

599 ): 

600 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

601 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted") 

602 control_message.host_request_status_target = HostRequestStatus.accepted 

603 host_request.status = HostRequestStatus.accepted 

604 session.flush() 

605 

606 recipient_context = make_notification_user_context(user_id=host_request.initiator_user_id) 

607 notify( 

608 session, 

609 user_id=host_request.initiator_user_id, 

610 topic_action=NotificationTopicAction.host_request__accept, 

611 key=str(host_request.conversation_id), 

612 data=notification_data_pb2.HostRequestAccept( 

613 host_request=host_request_to_pb(host_request, session, recipient_context), 

614 host=user_model_to_pb(host_request.recipient, session, recipient_context), 

615 ), 

616 moderation_state_id=host_request.moderation_state_id, 

617 ) 

618 

619 count_host_response(host_request.initiator_user_id, "accepted") 

620 log_event( 

621 context, 

622 session, 

623 "host_request.accepted", 

624 { 

625 "host_request_id": host_request.conversation_id, 

626 "surfer_id": host_request.initiator_user_id, 

627 "host_id": host_request.recipient_user_id, 

628 "surfer_gender": host_request.initiator.gender, 

629 "host_gender": host_request.recipient.gender, 

630 "from_date": str(host_request.from_date), 

631 "to_date": str(host_request.to_date), 

632 "host_city": host_request.hosting_city, 

633 }, 

634 ) 

635 

636 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED: 

637 # only host can reject 

638 if context.user_id != host_request.recipient_user_id: 638 ↛ 639line 638 didn't jump to line 639 because the condition on line 638 was never true

639 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

640 # can't reject a cancelled or already rejected request 

641 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true

642 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

643 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected") 

644 control_message.host_request_status_target = HostRequestStatus.rejected 

645 host_request.status = HostRequestStatus.rejected 

646 session.flush() 

647 

648 recipient_context = make_notification_user_context(user_id=host_request.initiator_user_id) 

649 notify( 

650 session, 

651 user_id=host_request.initiator_user_id, 

652 topic_action=NotificationTopicAction.host_request__reject, 

653 key=str(host_request.conversation_id), 

654 data=notification_data_pb2.HostRequestReject( 

655 host_request=host_request_to_pb(host_request, session, recipient_context), 

656 host=user_model_to_pb(host_request.recipient, session, recipient_context), 

657 ), 

658 moderation_state_id=host_request.moderation_state_id, 

659 ) 

660 

661 count_host_response(host_request.initiator_user_id, "rejected") 

662 

663 log_event( 

664 context, 

665 session, 

666 "host_request.rejected", 

667 { 

668 "host_request_id": host_request.conversation_id, 

669 "surfer_id": host_request.initiator_user_id, 

670 "host_id": host_request.recipient_user_id, 

671 "surfer_gender": host_request.initiator.gender, 

672 "host_gender": host_request.recipient.gender, 

673 "from_date": str(host_request.from_date), 

674 "to_date": str(host_request.to_date), 

675 "host_city": host_request.hosting_city, 

676 }, 

677 ) 

678 

679 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED: 

680 # only surfer can confirm 

681 if context.user_id != host_request.initiator_user_id: 

682 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

683 # can only confirm an accepted request 

684 if host_request.status != HostRequestStatus.accepted: 

685 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

686 control_message.host_request_status_target = HostRequestStatus.confirmed 

687 host_request.status = HostRequestStatus.confirmed 

688 session.flush() 

689 

690 recipient_context = make_notification_user_context(user_id=host_request.recipient_user_id) 

691 notify( 

692 session, 

693 user_id=host_request.recipient_user_id, 

694 topic_action=NotificationTopicAction.host_request__confirm, 

695 key=str(host_request.conversation_id), 

696 data=notification_data_pb2.HostRequestConfirm( 

697 host_request=host_request_to_pb(host_request, session, recipient_context), 

698 surfer=user_model_to_pb(host_request.initiator, session, recipient_context), 

699 ), 

700 moderation_state_id=host_request.moderation_state_id, 

701 ) 

702 

703 count_host_response(host_request.recipient_user_id, "confirmed") 

704 log_event( 

705 context, 

706 session, 

707 "host_request.confirmed", 

708 { 

709 "host_request_id": host_request.conversation_id, 

710 "surfer_id": host_request.initiator_user_id, 

711 "host_id": host_request.recipient_user_id, 

712 "surfer_gender": host_request.initiator.gender, 

713 "host_gender": host_request.recipient.gender, 

714 "from_date": str(host_request.from_date), 

715 "to_date": str(host_request.to_date), 

716 "host_city": host_request.hosting_city, 

717 }, 

718 ) 

719 

720 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED: 

721 # only surfer can cancel 

722 if context.user_id != host_request.initiator_user_id: 

723 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

724 # can't' cancel an already cancelled or rejected request 

725 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 725 ↛ 726line 725 didn't jump to line 726 because the condition on line 725 was never true

726 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

727 control_message.host_request_status_target = HostRequestStatus.cancelled 

728 host_request.status = HostRequestStatus.cancelled 

729 session.flush() 

730 

731 recipient_context = make_notification_user_context(user_id=host_request.recipient_user_id) 

732 notify( 

733 session, 

734 user_id=host_request.recipient_user_id, 

735 topic_action=NotificationTopicAction.host_request__cancel, 

736 key=str(host_request.conversation_id), 

737 data=notification_data_pb2.HostRequestCancel( 

738 host_request=host_request_to_pb(host_request, session, recipient_context), 

739 surfer=user_model_to_pb(host_request.initiator, session, recipient_context), 

740 ), 

741 moderation_state_id=host_request.moderation_state_id, 

742 ) 

743 

744 count_host_response(host_request.recipient_user_id, "cancelled") 

745 log_event( 

746 context, 

747 session, 

748 "host_request.cancelled", 

749 { 

750 "host_request_id": host_request.conversation_id, 

751 "surfer_id": host_request.initiator_user_id, 

752 "host_id": host_request.recipient_user_id, 

753 "surfer_gender": host_request.initiator.gender, 

754 "host_gender": host_request.recipient.gender, 

755 "from_date": str(host_request.from_date), 

756 "to_date": str(host_request.to_date), 

757 "host_city": host_request.hosting_city, 

758 }, 

759 ) 

760 

761 session.add(control_message) 

762 

763 if request.text: 

764 latest_message = Message( 

765 conversation_id=host_request.conversation_id, 

766 text=request.text, 

767 author_id=context.user_id, 

768 message_type=MessageType.text, 

769 ) 

770 

771 session.add(latest_message) 

772 else: 

773 latest_message = control_message 

774 

775 session.flush() 

776 

777 if host_request.initiator_user_id == context.user_id: 

778 host_request.initiator_last_seen_message_id = latest_message.id 

779 else: 

780 host_request.recipient_last_seen_message_id = latest_message.id 

781 session.commit() 

782 

783 return empty_pb2.Empty() 

784 

785 def GetHostRequestMessages( 

786 self, request: requests_pb2.GetHostRequestMessagesReq, context: CouchersContext, session: Session 

787 ) -> requests_pb2.GetHostRequestMessagesRes: 

788 host_request = session.execute( 

789 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

790 HostRequest.conversation_id == request.host_request_id 

791 ) 

792 ).scalar_one_or_none() 

793 

794 if not host_request: 794 ↛ 795line 794 didn't jump to line 795 because the condition on line 794 was never true

795 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

796 

797 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true

798 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

799 

800 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

801 pagination = min(pagination, MAX_PAGE_SIZE) 

802 

803 messages = ( 

804 session.execute( 

805 select(Message) 

806 .where(Message.conversation_id == host_request.conversation_id) 

807 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0))) 

808 .order_by(Message.id.desc()) 

809 .limit(pagination + 1) 

810 ) 

811 .scalars() 

812 .all() 

813 ) 

814 

815 no_more = len(messages) <= pagination 

816 

817 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0 

818 

819 return requests_pb2.GetHostRequestMessagesRes( 

820 last_message_id=last_message_id, 

821 no_more=no_more, 

822 messages=[message_to_pb(message) for message in messages[:pagination]], 

823 ) 

824 

825 def SendHostRequestMessage( 

826 self, request: requests_pb2.SendHostRequestMessageReq, context: CouchersContext, session: Session 

827 ) -> empty_pb2.Empty: 

828 if request.text == "": 

829 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

830 host_request = session.execute( 

831 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

832 HostRequest.conversation_id == request.host_request_id 

833 ) 

834 ).scalar_one_or_none() 

835 

836 if not host_request: 

837 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

838 

839 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id: 

840 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

841 

842 if host_request.recipient_user_id == context.user_id: 

843 _possibly_observe_first_response_time(session, host_request, context.user_id, "message") 

844 

845 message = Message( 

846 conversation_id=host_request.conversation_id, 

847 author_id=context.user_id, 

848 message_type=MessageType.text, 

849 text=request.text, 

850 ) 

851 

852 session.add(message) 

853 session.flush() 

854 

855 if host_request.initiator_user_id == context.user_id: 

856 host_request.initiator_last_seen_message_id = message.id 

857 

858 recipient_context = make_notification_user_context(user_id=host_request.recipient_user_id) 

859 notify( 

860 session, 

861 user_id=host_request.recipient_user_id, 

862 topic_action=NotificationTopicAction.host_request__message, 

863 key=str(host_request.conversation_id), 

864 data=notification_data_pb2.HostRequestMessage( 

865 host_request=host_request_to_pb(host_request, session, recipient_context), 

866 user=user_model_to_pb(host_request.initiator, session, recipient_context), 

867 text=request.text, 

868 am_host=True, 

869 ), 

870 moderation_state_id=host_request.moderation_state_id, 

871 ) 

872 

873 else: 

874 host_request.recipient_last_seen_message_id = message.id 

875 

876 recipient_context = make_notification_user_context(user_id=host_request.initiator_user_id) 

877 notify( 

878 session, 

879 user_id=host_request.initiator_user_id, 

880 topic_action=NotificationTopicAction.host_request__message, 

881 key=str(host_request.conversation_id), 

882 data=notification_data_pb2.HostRequestMessage( 

883 host_request=host_request_to_pb(host_request, session, recipient_context), 

884 user=user_model_to_pb(host_request.recipient, session, recipient_context), 

885 text=request.text, 

886 am_host=False, 

887 ), 

888 moderation_state_id=host_request.moderation_state_id, 

889 ) 

890 

891 session.commit() 

892 

893 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

894 sent_messages_counter.labels(user_gender, "host request").inc() 

895 log_event( 

896 context, 

897 session, 

898 "host_request.message_sent", 

899 { 

900 "host_request_id": host_request.conversation_id, 

901 "surfer_id": host_request.initiator_user_id, 

902 "host_id": host_request.recipient_user_id, 

903 "role": "host" if context.user_id == host_request.recipient_user_id else "surfer", 

904 "host_city": host_request.hosting_city, 

905 }, 

906 ) 

907 

908 return empty_pb2.Empty() 

909 

910 def GetHostRequestUpdates( 

911 self, request: requests_pb2.GetHostRequestUpdatesReq, context: CouchersContext, session: Session 

912 ) -> requests_pb2.GetHostRequestUpdatesRes: 

913 if request.only_sent and request.only_received: 913 ↛ 914line 913 didn't jump to line 914 because the condition on line 913 was never true

914 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

915 

916 if request.newest_message_id == 0: 

917 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

918 

919 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 919 ↛ 920line 919 didn't jump to line 920 because the condition on line 919 was never true

920 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

921 

922 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

923 pagination = min(pagination, MAX_PAGE_SIZE) 

924 

925 statement = where_moderated_content_visible( 

926 select( 

927 Message, 

928 HostRequest.status.label("host_request_status"), 

929 HostRequest.conversation_id.label("host_request_id"), 

930 ) 

931 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

932 .where(Message.id > request.newest_message_id), 

933 context, 

934 HostRequest, 

935 is_list_operation=False, 

936 ) 

937 

938 if request.only_sent: 938 ↛ 939line 938 didn't jump to line 939 because the condition on line 938 was never true

939 statement = statement.where(HostRequest.initiator_user_id == context.user_id) 

940 elif request.only_received: 940 ↛ 941line 940 didn't jump to line 941 because the condition on line 940 was never true

941 statement = statement.where(HostRequest.recipient_user_id == context.user_id) 

942 else: 

943 statement = statement.where( 

944 or_(HostRequest.recipient_user_id == context.user_id, HostRequest.initiator_user_id == context.user_id) 

945 ) 

946 

947 statement = statement.order_by(Message.id.asc()).limit(pagination + 1) 

948 res = session.execute(statement).all() 

949 

950 no_more = len(res) <= pagination 

951 

952 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO 

953 

954 return requests_pb2.GetHostRequestUpdatesRes( 

955 no_more=no_more, 

956 updates=[ 

957 requests_pb2.HostRequestUpdate( 

958 host_request_id=result.host_request_id, 

959 status=hostrequeststatus2api[result.host_request_status], 

960 message=message_to_pb(result.Message), 

961 ) 

962 for result in res[:pagination] 

963 ], 

964 ) 

965 

966 def MarkLastSeenHostRequest( 

967 self, request: requests_pb2.MarkLastSeenHostRequestReq, context: CouchersContext, session: Session 

968 ) -> empty_pb2.Empty: 

969 host_request = session.execute( 

970 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

971 HostRequest.conversation_id == request.host_request_id 

972 ) 

973 ).scalar_one_or_none() 

974 

975 if not host_request: 975 ↛ 976line 975 didn't jump to line 976 because the condition on line 975 was never true

976 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

977 

978 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id: 978 ↛ 979line 978 didn't jump to line 979 because the condition on line 978 was never true

979 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

980 

981 if host_request.initiator_user_id == context.user_id: 981 ↛ 982line 981 didn't jump to line 982 because the condition on line 981 was never true

982 if not host_request.initiator_last_seen_message_id <= request.last_seen_message_id: 

983 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

984 host_request.initiator_last_seen_message_id = request.last_seen_message_id 

985 else: 

986 if not host_request.recipient_last_seen_message_id <= request.last_seen_message_id: 

987 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

988 host_request.recipient_last_seen_message_id = request.last_seen_message_id 

989 

990 mark_notifications_seen( 

991 session, 

992 user_id=context.user_id, 

993 key=str(host_request.conversation_id), 

994 topic_actions=[ 

995 NotificationTopicAction.host_request__create, 

996 NotificationTopicAction.host_request__accept, 

997 NotificationTopicAction.host_request__reject, 

998 NotificationTopicAction.host_request__confirm, 

999 NotificationTopicAction.host_request__cancel, 

1000 NotificationTopicAction.host_request__message, 

1001 NotificationTopicAction.host_request__missed_messages, 

1002 NotificationTopicAction.host_request__reminder, 

1003 ], 

1004 ) 

1005 

1006 session.commit() 

1007 return empty_pb2.Empty() 

1008 

1009 def SetHostRequestArchiveStatus( 

1010 self, request: requests_pb2.SetHostRequestArchiveStatusReq, context: CouchersContext, session: Session 

1011 ) -> requests_pb2.SetHostRequestArchiveStatusRes: 

1012 host_request = session.execute( 

1013 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False) 

1014 .where(HostRequest.conversation_id == request.host_request_id) 

1015 .where( 

1016 or_(HostRequest.initiator_user_id == context.user_id, HostRequest.recipient_user_id == context.user_id) 

1017 ) 

1018 ).scalar_one_or_none() 

1019 

1020 if not host_request: 1020 ↛ 1021line 1020 didn't jump to line 1021 because the condition on line 1020 was never true

1021 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

1022 

1023 if context.user_id == host_request.initiator_user_id: 1023 ↛ 1026line 1023 didn't jump to line 1026 because the condition on line 1023 was always true

1024 host_request.is_initiator_archived = request.is_archived 

1025 else: 

1026 host_request.is_recipient_archived = request.is_archived 

1027 

1028 return requests_pb2.SetHostRequestArchiveStatusRes( 

1029 host_request_id=host_request.conversation_id, 

1030 is_archived=request.is_archived, 

1031 ) 

1032 

1033 def GetResponseRate( 

1034 self, request: requests_pb2.GetResponseRateReq, context: CouchersContext, session: Session 

1035 ) -> requests_pb2.GetResponseRateRes: 

1036 user_res = session.execute( 

1037 select(User.id, UserResponseRate) 

1038 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id) 

1039 .where(users_visible(context, User)) 

1040 .where(User.id == request.user_id) 

1041 ).one_or_none() 

1042 

1043 # if user doesn't exist, return None 

1044 if not user_res: 

1045 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1046 

1047 user, response_rates = user_res 

1048 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates)) # type: ignore[arg-type] 

1049 

1050 def SendHostRequestFeedback( 

1051 self, request: requests_pb2.SendHostRequestFeedbackReq, context: CouchersContext, session: Session 

1052 ) -> empty_pb2.Empty: 

1053 host_request = session.execute( 

1054 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False) 

1055 .where(HostRequest.conversation_id == request.host_request_id) 

1056 .where(HostRequest.recipient_user_id == context.user_id) 

1057 ).scalar_one_or_none() 

1058 

1059 if not host_request: 

1060 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

1061 

1062 feedback = session.execute( 

1063 select(HostRequestFeedback) 

1064 .where(HostRequestFeedback.host_request_id == host_request.conversation_id) 

1065 .where(HostRequestFeedback.from_user_id == context.user_id) 

1066 ).scalar_one_or_none() 

1067 

1068 if feedback: 

1069 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback") 

1070 

1071 session.add( 

1072 HostRequestFeedback( 

1073 host_request_id=host_request.conversation_id, 

1074 from_user_id=host_request.recipient_user_id, 

1075 to_user_id=host_request.initiator_user_id, 

1076 request_quality=hostrequestquality2sql.get(request.host_request_quality), 

1077 decline_reason=request.decline_reason, 

1078 ) 

1079 ) 

1080 quality = hostrequestquality2sql.get(request.host_request_quality) 

1081 log_event( 

1082 context, 

1083 session, 

1084 "host_request.feedback_submitted", 

1085 { 

1086 "host_request_id": host_request.conversation_id, 

1087 "surfer_id": host_request.initiator_user_id, 

1088 "host_id": host_request.recipient_user_id, 

1089 "request_quality": quality.name if quality else None, 

1090 "has_decline_reason": bool(request.decline_reason), 

1091 "host_city": host_request.hosting_city, 

1092 }, 

1093 ) 

1094 

1095 return empty_pb2.Empty()