Coverage for app / backend / src / couchers / servicers / requests.py: 91%

295 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 14:14 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import exists, select 

7from sqlalchemy.orm import Session, aliased 

8from sqlalchemy.sql import and_, func, or_ 

9 

10from couchers.constants import HOST_REQUEST_MIN_LENGTH_UTF16 

11from couchers.context import CouchersContext 

12from couchers.event_log import log_event 

13from couchers.helpers.completed_profile import has_completed_profile 

14from couchers.materialized_views import UserResponseRate 

15from couchers.metrics import ( 

16 account_age_on_host_request_create_histogram, 

17 host_request_first_response_histogram, 

18 host_request_responses_counter, 

19 host_requests_sent_counter, 

20 sent_messages_counter, 

21) 

22from couchers.models import ( 

23 Conversation, 

24 HostRequest, 

25 HostRequestFeedback, 

26 HostRequestQuality, 

27 HostRequestStatus, 

28 Message, 

29 MessageType, 

30 ModerationObjectType, 

31 RateLimitAction, 

32 User, 

33) 

34from couchers.models.notifications import NotificationTopicAction 

35from couchers.moderation.utils import create_moderation 

36from couchers.notifications.notify import notify 

37from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc 

38from couchers.rate_limits.check import process_rate_limits_and_check_abort 

39from couchers.rate_limits.definitions import RATE_LIMIT_HOURS 

40from couchers.servicers.api import response_rate_to_pb, user_model_to_pb 

41from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible 

42from couchers.utils import ( 

43 Timestamp_from_datetime, 

44 date_to_api, 

45 get_coordinates, 

46 now, 

47 parse_date, 

48 today_in_timezone, 

49) 

50 

51logger = logging.getLogger(__name__) 

52 

53DEFAULT_PAGINATION_LENGTH = 10 

54MAX_PAGE_SIZE = 50 

55 

56 

57hostrequeststatus2api = { 

58 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING, 

59 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

60 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED, 

61 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED, 

62 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED, 

63} 

64 

65hostrequestquality2sql = { 

66 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality, 

67 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality, 

68 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality, 

69} 

70 

71 

72def message_to_pb(message: Message) -> conversations_pb2.Message: 

73 """ 

74 Turns the given message to a protocol buffer 

75 """ 

76 if message.is_normal_message: 

77 return conversations_pb2.Message( 

78 message_id=message.id, 

79 author_user_id=message.author_id, 

80 time=Timestamp_from_datetime(message.time), 

81 text=conversations_pb2.MessageContentText(text=message.text), 

82 ) 

83 else: 

84 return conversations_pb2.Message( 

85 message_id=message.id, 

86 author_user_id=message.author_id, 

87 time=Timestamp_from_datetime(message.time), 

88 chat_created=( 

89 conversations_pb2.MessageContentChatCreated() 

90 if message.message_type == MessageType.chat_created 

91 else None 

92 ), 

93 host_request_status_changed=( 

94 conversations_pb2.MessageContentHostRequestStatusChanged( 

95 status=hostrequeststatus2api[message.host_request_status_target] # type: ignore[index] 

96 ) 

97 if message.message_type == MessageType.host_request_status_changed 

98 else None 

99 ), 

100 ) 

101 

102 

103def host_request_to_pb( 

104 host_request: HostRequest, session: Session, context: CouchersContext 

105) -> requests_pb2.HostRequest: 

106 initial_message = session.execute( 

107 select(Message) 

108 .where(Message.conversation_id == host_request.conversation_id) 

109 .order_by(Message.id.asc()) 

110 .limit(1) 

111 ).scalar_one() 

112 

113 latest_message = session.execute( 

114 select(Message) 

115 .where(Message.conversation_id == host_request.conversation_id) 

116 .order_by(Message.id.desc()) 

117 .limit(1) 

118 ).scalar_one() 

119 

120 lat, lng = get_coordinates(host_request.hosting_location) 

121 

122 need_feedback = False 

123 if context.user_id == host_request.host_user_id and host_request.status == HostRequestStatus.rejected: 

124 need_feedback = not session.execute( 

125 select( 

126 exists().where( 

127 HostRequestFeedback.from_user_id == context.user_id, 

128 HostRequestFeedback.host_request_id == host_request.conversation_id, 

129 ) 

130 ) 

131 ).scalar_one() 

132 

133 return requests_pb2.HostRequest( 

134 host_request_id=host_request.conversation_id, 

135 surfer_user_id=host_request.surfer_user_id, 

136 host_user_id=host_request.host_user_id, 

137 status=hostrequeststatus2api[host_request.status], 

138 created=Timestamp_from_datetime(initial_message.time), 

139 from_date=date_to_api(host_request.from_date), 

140 to_date=date_to_api(host_request.to_date), 

141 last_seen_message_id=( 

142 host_request.surfer_last_seen_message_id 

143 if context.user_id == host_request.surfer_user_id 

144 else host_request.host_last_seen_message_id 

145 ), 

146 latest_message=message_to_pb(latest_message), 

147 hosting_city=host_request.hosting_city, 

148 hosting_lat=lat, 

149 hosting_lng=lng, 

150 hosting_radius=host_request.hosting_radius, 

151 need_host_request_feedback=need_feedback, 

152 is_archived=( 

153 host_request.is_host_archived 

154 if context.user_id == host_request.host_user_id 

155 else host_request.is_surfer_archived 

156 ), 

157 ) 

158 

159 

160def _possibly_observe_first_response_time( 

161 session: Session, host_request: HostRequest, user_id: int, response_type: str 

162) -> None: 

163 # if this is the first response then there's nothing by this user yet 

164 assert host_request.host_user_id == user_id 

165 

166 number_messages_by_host = session.execute( 

167 select(func.count()) 

168 .where(Message.conversation_id == host_request.conversation_id) 

169 .where(Message.author_id == user_id) 

170 ).scalar_one_or_none() 

171 

172 if number_messages_by_host == 0: 

173 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one() 

174 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one() 

175 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe( 

176 (now() - host_request.conversation.created).total_seconds() 

177 ) 

178 

179 

180def _is_host_request_long_enough(text: str) -> bool: 

181 # Python's len(str) does not match Javascript's string.length. 

182 # e.g. len("é") == 2 but "é".length == 1. 

183 # To match the frontend's validation, measure the string in utf16 code units. 

184 text_length_utf16 = len(text.encode("utf-16-le")) // 2 # utf-16-le does not include a prefix BOM code unit. 

185 return text_length_utf16 >= HOST_REQUEST_MIN_LENGTH_UTF16 

186 

187 

188class Requests(requests_pb2_grpc.RequestsServicer): 

189 def CreateHostRequest( 

190 self, request: requests_pb2.CreateHostRequestReq, context: CouchersContext, session: Session 

191 ) -> requests_pb2.CreateHostRequestRes: 

192 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

193 if not has_completed_profile(session, user): 

194 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request") 

195 

196 if request.host_user_id == context.user_id: 

197 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self") 

198 

199 # just to check host exists and is visible 

200 host = session.execute( 

201 select(User).where(users_visible(context, User)).where(User.id == request.host_user_id) 

202 ).scalar_one_or_none() 

203 if not host: 

204 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

205 

206 from_date = parse_date(request.from_date) 

207 to_date = parse_date(request.to_date) 

208 

209 if not from_date or not to_date: 

210 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date") 

211 

212 today = today_in_timezone(host.timezone) 

213 

214 # request starts from the past 

215 if from_date < today: 

216 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today") 

217 

218 # from_date is not >= to_date 

219 if from_date >= to_date: 

220 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to") 

221 

222 # No need to check today > to_date 

223 

224 if from_date - today > timedelta(days=365): 

225 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year") 

226 

227 if to_date - from_date > timedelta(days=365): 

228 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year") 

229 

230 # Check minimum length 

231 if not _is_host_request_long_enough(request.text): 

232 context.abort_with_error_code( 

233 grpc.StatusCode.INVALID_ARGUMENT, 

234 "host_request_too_short", 

235 substitutions={"chars": str(HOST_REQUEST_MIN_LENGTH_UTF16)}, 

236 ) 

237 

238 # Check if user has been sending host requests excessively 

239 if process_rate_limits_and_check_abort( 

240 session=session, user_id=context.user_id, action=RateLimitAction.host_request 

241 ): 

242 context.abort_with_error_code( 

243 grpc.StatusCode.RESOURCE_EXHAUSTED, 

244 "host_request_rate_limit", 

245 substitutions={"hours": str(RATE_LIMIT_HOURS)}, 

246 ) 

247 

248 conversation = Conversation() 

249 session.add(conversation) 

250 session.flush() 

251 

252 session.add( 

253 Message( 

254 conversation_id=conversation.id, 

255 author_id=context.user_id, 

256 message_type=MessageType.chat_created, 

257 ) 

258 ) 

259 

260 message = Message( 

261 conversation_id=conversation.id, 

262 author_id=context.user_id, 

263 text=request.text, 

264 message_type=MessageType.text, 

265 ) 

266 session.add(message) 

267 session.flush() 

268 

269 # Create moderation state for UMS (starts as SHADOWED) 

270 moderation_state = create_moderation( 

271 session=session, 

272 object_type=ModerationObjectType.host_request, 

273 object_id=conversation.id, 

274 creator_user_id=context.user_id, 

275 ) 

276 

277 host_request = HostRequest( 

278 conversation_id=conversation.id, 

279 surfer_user_id=context.user_id, 

280 host_user_id=host.id, 

281 moderation_state_id=moderation_state.id, 

282 from_date=from_date, 

283 to_date=to_date, 

284 status=HostRequestStatus.pending, 

285 surfer_last_seen_message_id=message.id, 

286 # TODO: tz 

287 # timezone=host.timezone, 

288 hosting_city=host.city, 

289 hosting_location=host.geom, 

290 hosting_radius=host.geom_radius, 

291 ) 

292 session.add(host_request) 

293 session.flush() 

294 

295 notify( 

296 session, 

297 user_id=host_request.host_user_id, 

298 topic_action=NotificationTopicAction.host_request__create, 

299 key=str(host_request.conversation_id), 

300 data=notification_data_pb2.HostRequestCreate( 

301 host_request=host_request_to_pb(host_request, session, context), 

302 surfer=user_model_to_pb(host_request.surfer, session, context), 

303 text=request.text, 

304 ), 

305 moderation_state_id=moderation_state.id, 

306 ) 

307 

308 host_requests_sent_counter.labels(user.gender, host.gender).inc() 

309 sent_messages_counter.labels(user.gender, "host request send").inc() 

310 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe( 

311 (now() - user.joined).total_seconds() 

312 ) 

313 log_event( 

314 context, 

315 session, 

316 "host_request.created", 

317 { 

318 "host_request_id": host_request.conversation_id, 

319 "host_id": host.id, 

320 "surfer_gender": user.gender, 

321 "host_gender": host.gender, 

322 "city": host.city, 

323 "from_date": str(from_date), 

324 "to_date": str(to_date), 

325 "nights": (to_date - from_date).days, 

326 }, 

327 ) 

328 

329 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id) 

330 

331 def GetHostRequest( 

332 self, request: requests_pb2.GetHostRequestReq, context: CouchersContext, session: Session 

333 ) -> requests_pb2.HostRequest: 

334 host_request = session.execute( 

335 where_moderated_content_visible( 

336 where_users_column_visible( 

337 where_users_column_visible( 

338 select(HostRequest), 

339 context, 

340 HostRequest.surfer_user_id, 

341 ), 

342 context, 

343 HostRequest.host_user_id, 

344 ), 

345 context, 

346 HostRequest, 

347 is_list_operation=False, 

348 ) 

349 .where(HostRequest.conversation_id == request.host_request_id) 

350 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

351 ).scalar_one_or_none() 

352 

353 if not host_request: 

354 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

355 

356 return host_request_to_pb(host_request, session, context) 

357 

358 def ListHostRequests( 

359 self, request: requests_pb2.ListHostRequestsReq, context: CouchersContext, session: Session 

360 ) -> requests_pb2.ListHostRequestsRes: 

361 if request.only_sent and request.only_received: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true

362 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

363 

364 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

365 pagination = min(pagination, MAX_PAGE_SIZE) 

366 

367 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have 

368 # none as message_2.id. So just filter for these to get the highest messages only. 

369 # See https://stackoverflow.com/a/27802817/6115336 

370 message_2 = aliased(Message) 

371 statement = where_moderated_content_visible( 

372 where_users_column_visible( 

373 where_users_column_visible( 

374 select(Message, HostRequest, Conversation) 

375 .outerjoin( 

376 message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id) 

377 ) 

378 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

379 .join(Conversation, Conversation.id == HostRequest.conversation_id), 

380 context, 

381 HostRequest.surfer_user_id, 

382 ), 

383 context, 

384 HostRequest.host_user_id, 

385 ), 

386 context, 

387 HostRequest, 

388 is_list_operation=True, 

389 ).where(message_2.id == None) 

390 

391 if request.last_request_id != 0: 

392 statement = statement.where(Message.id < request.last_request_id) 

393 if request.only_sent: 

394 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

395 elif request.only_received: 

396 statement = statement.where(HostRequest.host_user_id == context.user_id) 

397 elif request.HasField("only_archived"): 

398 statement = statement.where( 

399 or_( 

400 and_( 

401 HostRequest.surfer_user_id == context.user_id, 

402 HostRequest.is_surfer_archived == request.only_archived, 

403 ), 

404 and_( 

405 HostRequest.host_user_id == context.user_id, 

406 HostRequest.is_host_archived == request.only_archived, 

407 ), 

408 ) 

409 ) 

410 else: 

411 statement = statement.where( 

412 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

413 ) 

414 

415 # TODO: I considered having the latest control message be the single source of truth for 

416 # the HostRequest.status, but decided against it because of this filter. 

417 # Another possibility is to filter in the python instead of SQL, but that's slower 

418 if request.only_active: 

419 statement = statement.where( 

420 or_( 

421 HostRequest.status == HostRequestStatus.pending, 

422 HostRequest.status == HostRequestStatus.accepted, 

423 HostRequest.status == HostRequestStatus.confirmed, 

424 ) 

425 ) 

426 statement = statement.where(HostRequest.end_time <= func.now()) 

427 

428 statement = statement.order_by(Message.id.desc()).limit(pagination + 1) 

429 results = session.execute(statement).all() 

430 

431 host_requests = [] 

432 for result in results[:pagination]: 

433 lat, lng = get_coordinates(result.HostRequest.hosting_location) 

434 host_requests.append( 

435 requests_pb2.HostRequest( 

436 host_request_id=result.HostRequest.conversation_id, 

437 surfer_user_id=result.HostRequest.surfer_user_id, 

438 host_user_id=result.HostRequest.host_user_id, 

439 status=hostrequeststatus2api[result.HostRequest.status], 

440 created=Timestamp_from_datetime(result.Conversation.created), 

441 from_date=date_to_api(result.HostRequest.from_date), 

442 to_date=date_to_api(result.HostRequest.to_date), 

443 last_seen_message_id=( 

444 result.HostRequest.surfer_last_seen_message_id 

445 if context.user_id == result.HostRequest.surfer_user_id 

446 else result.HostRequest.host_last_seen_message_id 

447 ), 

448 latest_message=message_to_pb(result.Message), 

449 hosting_city=result.HostRequest.hosting_city, 

450 hosting_lat=lat, 

451 hosting_lng=lng, 

452 hosting_radius=result.HostRequest.hosting_radius, 

453 ) 

454 ) 

455 

456 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO 

457 no_more = len(results) <= pagination 

458 

459 return requests_pb2.ListHostRequestsRes( 

460 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests 

461 ) 

462 

463 def RespondHostRequest( 

464 self, request: requests_pb2.RespondHostRequestReq, context: CouchersContext, session: Session 

465 ) -> empty_pb2.Empty: 

466 def count_host_response(other_user_id: int, response_type: str) -> None: 

467 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

468 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one() 

469 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc() 

470 sent_messages_counter.labels(user_gender, "host request response").inc() 

471 

472 host_request = session.execute( 

473 where_moderated_content_visible( 

474 where_users_column_visible( 

475 where_users_column_visible( 

476 select(HostRequest), 

477 context, 

478 HostRequest.surfer_user_id, 

479 ), 

480 context, 

481 HostRequest.host_user_id, 

482 ), 

483 context, 

484 HostRequest, 

485 is_list_operation=False, 

486 ).where(HostRequest.conversation_id == request.host_request_id) 

487 ).scalar_one_or_none() 

488 

489 if not host_request: 

490 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

491 

492 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

493 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

494 

495 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING: 

496 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

497 

498 if host_request.end_time < now(): 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true

499 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past") 

500 

501 control_message = Message( 

502 message_type=MessageType.host_request_status_changed, 

503 conversation_id=host_request.conversation_id, 

504 author_id=context.user_id, 

505 ) 

506 

507 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED: 

508 # only host can accept 

509 if context.user_id != host_request.host_user_id: 

510 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host") 

511 # can't accept a cancelled or confirmed request (only reject), or already accepted 

512 if ( 512 ↛ 517line 512 didn't jump to line 517 because the condition on line 512 was never true

513 host_request.status == HostRequestStatus.cancelled 

514 or host_request.status == HostRequestStatus.confirmed 

515 or host_request.status == HostRequestStatus.accepted 

516 ): 

517 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

518 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted") 

519 control_message.host_request_status_target = HostRequestStatus.accepted 

520 host_request.status = HostRequestStatus.accepted 

521 session.flush() 

522 

523 notify( 

524 session, 

525 user_id=host_request.surfer_user_id, 

526 topic_action=NotificationTopicAction.host_request__accept, 

527 key=str(host_request.conversation_id), 

528 data=notification_data_pb2.HostRequestAccept( 

529 host_request=host_request_to_pb(host_request, session, context), 

530 host=user_model_to_pb(host_request.host, session, context), 

531 ), 

532 moderation_state_id=host_request.moderation_state_id, 

533 ) 

534 

535 count_host_response(host_request.surfer_user_id, "accepted") 

536 log_event( 

537 context, 

538 session, 

539 "host_request.accepted", 

540 { 

541 "host_request_id": host_request.conversation_id, 

542 "surfer_id": host_request.surfer_user_id, 

543 "host_id": host_request.host_user_id, 

544 "surfer_gender": host_request.surfer.gender, 

545 "host_gender": host_request.host.gender, 

546 "from_date": str(host_request.from_date), 

547 "to_date": str(host_request.to_date), 

548 "host_city": host_request.hosting_city, 

549 }, 

550 ) 

551 

552 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED: 

553 # only host can reject 

554 if context.user_id != host_request.host_user_id: 554 ↛ 555line 554 didn't jump to line 555 because the condition on line 554 was never true

555 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

556 # can't reject a cancelled or already rejected request 

557 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true

558 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

559 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected") 

560 control_message.host_request_status_target = HostRequestStatus.rejected 

561 host_request.status = HostRequestStatus.rejected 

562 session.flush() 

563 

564 notify( 

565 session, 

566 user_id=host_request.surfer_user_id, 

567 topic_action=NotificationTopicAction.host_request__reject, 

568 key=str(host_request.conversation_id), 

569 data=notification_data_pb2.HostRequestReject( 

570 host_request=host_request_to_pb(host_request, session, context), 

571 host=user_model_to_pb(host_request.host, session, context), 

572 ), 

573 moderation_state_id=host_request.moderation_state_id, 

574 ) 

575 

576 count_host_response(host_request.surfer_user_id, "rejected") 

577 log_event( 

578 context, 

579 session, 

580 "host_request.rejected", 

581 { 

582 "host_request_id": host_request.conversation_id, 

583 "surfer_id": host_request.surfer_user_id, 

584 "host_id": host_request.host_user_id, 

585 "surfer_gender": host_request.surfer.gender, 

586 "host_gender": host_request.host.gender, 

587 "from_date": str(host_request.from_date), 

588 "to_date": str(host_request.to_date), 

589 "host_city": host_request.hosting_city, 

590 }, 

591 ) 

592 

593 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED: 

594 # only surfer can confirm 

595 if context.user_id != host_request.surfer_user_id: 

596 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

597 # can only confirm an accepted request 

598 if host_request.status != HostRequestStatus.accepted: 

599 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

600 control_message.host_request_status_target = HostRequestStatus.confirmed 

601 host_request.status = HostRequestStatus.confirmed 

602 session.flush() 

603 

604 notify( 

605 session, 

606 user_id=host_request.host_user_id, 

607 topic_action=NotificationTopicAction.host_request__confirm, 

608 key=str(host_request.conversation_id), 

609 data=notification_data_pb2.HostRequestConfirm( 

610 host_request=host_request_to_pb(host_request, session, context), 

611 surfer=user_model_to_pb(host_request.surfer, session, context), 

612 ), 

613 moderation_state_id=host_request.moderation_state_id, 

614 ) 

615 

616 count_host_response(host_request.host_user_id, "confirmed") 

617 log_event( 

618 context, 

619 session, 

620 "host_request.confirmed", 

621 { 

622 "host_request_id": host_request.conversation_id, 

623 "surfer_id": host_request.surfer_user_id, 

624 "host_id": host_request.host_user_id, 

625 "surfer_gender": host_request.surfer.gender, 

626 "host_gender": host_request.host.gender, 

627 "from_date": str(host_request.from_date), 

628 "to_date": str(host_request.to_date), 

629 "host_city": host_request.hosting_city, 

630 }, 

631 ) 

632 

633 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED: 

634 # only surfer can cancel 

635 if context.user_id != host_request.surfer_user_id: 

636 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

637 # can't' cancel an already cancelled or rejected request 

638 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 638 ↛ 639line 638 didn't jump to line 639 because the condition on line 638 was never true

639 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

640 control_message.host_request_status_target = HostRequestStatus.cancelled 

641 host_request.status = HostRequestStatus.cancelled 

642 session.flush() 

643 

644 notify( 

645 session, 

646 user_id=host_request.host_user_id, 

647 topic_action=NotificationTopicAction.host_request__cancel, 

648 key=str(host_request.conversation_id), 

649 data=notification_data_pb2.HostRequestCancel( 

650 host_request=host_request_to_pb(host_request, session, context), 

651 surfer=user_model_to_pb(host_request.surfer, session, context), 

652 ), 

653 moderation_state_id=host_request.moderation_state_id, 

654 ) 

655 

656 count_host_response(host_request.host_user_id, "cancelled") 

657 log_event( 

658 context, 

659 session, 

660 "host_request.cancelled", 

661 { 

662 "host_request_id": host_request.conversation_id, 

663 "surfer_id": host_request.surfer_user_id, 

664 "host_id": host_request.host_user_id, 

665 "surfer_gender": host_request.surfer.gender, 

666 "host_gender": host_request.host.gender, 

667 "from_date": str(host_request.from_date), 

668 "to_date": str(host_request.to_date), 

669 "host_city": host_request.hosting_city, 

670 }, 

671 ) 

672 

673 session.add(control_message) 

674 

675 if request.text: 

676 latest_message = Message( 

677 conversation_id=host_request.conversation_id, 

678 text=request.text, 

679 author_id=context.user_id, 

680 message_type=MessageType.text, 

681 ) 

682 

683 session.add(latest_message) 

684 else: 

685 latest_message = control_message 

686 

687 session.flush() 

688 

689 if host_request.surfer_user_id == context.user_id: 

690 host_request.surfer_last_seen_message_id = latest_message.id 

691 else: 

692 host_request.host_last_seen_message_id = latest_message.id 

693 session.commit() 

694 

695 return empty_pb2.Empty() 

696 

697 def GetHostRequestMessages( 

698 self, request: requests_pb2.GetHostRequestMessagesReq, context: CouchersContext, session: Session 

699 ) -> requests_pb2.GetHostRequestMessagesRes: 

700 host_request = session.execute( 

701 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

702 HostRequest.conversation_id == request.host_request_id 

703 ) 

704 ).scalar_one_or_none() 

705 

706 if not host_request: 706 ↛ 707line 706 didn't jump to line 707 because the condition on line 706 was never true

707 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

708 

709 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 709 ↛ 710line 709 didn't jump to line 710 because the condition on line 709 was never true

710 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

711 

712 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

713 pagination = min(pagination, MAX_PAGE_SIZE) 

714 

715 messages = ( 

716 session.execute( 

717 select(Message) 

718 .where(Message.conversation_id == host_request.conversation_id) 

719 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0))) 

720 .order_by(Message.id.desc()) 

721 .limit(pagination + 1) 

722 ) 

723 .scalars() 

724 .all() 

725 ) 

726 

727 no_more = len(messages) <= pagination 

728 

729 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0 

730 

731 return requests_pb2.GetHostRequestMessagesRes( 

732 last_message_id=last_message_id, 

733 no_more=no_more, 

734 messages=[message_to_pb(message) for message in messages[:pagination]], 

735 ) 

736 

737 def SendHostRequestMessage( 

738 self, request: requests_pb2.SendHostRequestMessageReq, context: CouchersContext, session: Session 

739 ) -> empty_pb2.Empty: 

740 if request.text == "": 

741 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

742 host_request = session.execute( 

743 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

744 HostRequest.conversation_id == request.host_request_id 

745 ) 

746 ).scalar_one_or_none() 

747 

748 if not host_request: 

749 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

750 

751 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

752 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

753 

754 if host_request.host_user_id == context.user_id: 

755 _possibly_observe_first_response_time(session, host_request, context.user_id, "message") 

756 

757 message = Message( 

758 conversation_id=host_request.conversation_id, 

759 author_id=context.user_id, 

760 message_type=MessageType.text, 

761 text=request.text, 

762 ) 

763 

764 session.add(message) 

765 session.flush() 

766 

767 if host_request.surfer_user_id == context.user_id: 

768 host_request.surfer_last_seen_message_id = message.id 

769 

770 notify( 

771 session, 

772 user_id=host_request.host_user_id, 

773 topic_action=NotificationTopicAction.host_request__message, 

774 key=str(host_request.conversation_id), 

775 data=notification_data_pb2.HostRequestMessage( 

776 host_request=host_request_to_pb(host_request, session, context), 

777 user=user_model_to_pb(host_request.surfer, session, context), 

778 text=request.text, 

779 am_host=True, 

780 ), 

781 moderation_state_id=host_request.moderation_state_id, 

782 ) 

783 

784 else: 

785 host_request.host_last_seen_message_id = message.id 

786 

787 notify( 

788 session, 

789 user_id=host_request.surfer_user_id, 

790 topic_action=NotificationTopicAction.host_request__message, 

791 key=str(host_request.conversation_id), 

792 data=notification_data_pb2.HostRequestMessage( 

793 host_request=host_request_to_pb(host_request, session, context), 

794 user=user_model_to_pb(host_request.host, session, context), 

795 text=request.text, 

796 am_host=False, 

797 ), 

798 moderation_state_id=host_request.moderation_state_id, 

799 ) 

800 

801 session.commit() 

802 

803 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

804 sent_messages_counter.labels(user_gender, "host request").inc() 

805 log_event( 

806 context, 

807 session, 

808 "host_request.message_sent", 

809 { 

810 "host_request_id": host_request.conversation_id, 

811 "surfer_id": host_request.surfer_user_id, 

812 "host_id": host_request.host_user_id, 

813 "role": "host" if context.user_id == host_request.host_user_id else "surfer", 

814 "host_city": host_request.hosting_city, 

815 }, 

816 ) 

817 

818 return empty_pb2.Empty() 

819 

820 def GetHostRequestUpdates( 

821 self, request: requests_pb2.GetHostRequestUpdatesReq, context: CouchersContext, session: Session 

822 ) -> requests_pb2.GetHostRequestUpdatesRes: 

823 if request.only_sent and request.only_received: 823 ↛ 824line 823 didn't jump to line 824 because the condition on line 823 was never true

824 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

825 

826 if request.newest_message_id == 0: 

827 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

828 

829 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 829 ↛ 830line 829 didn't jump to line 830 because the condition on line 829 was never true

830 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

831 

832 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

833 pagination = min(pagination, MAX_PAGE_SIZE) 

834 

835 statement = where_moderated_content_visible( 

836 select( 

837 Message, 

838 HostRequest.status.label("host_request_status"), 

839 HostRequest.conversation_id.label("host_request_id"), 

840 ) 

841 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

842 .where(Message.id > request.newest_message_id), 

843 context, 

844 HostRequest, 

845 is_list_operation=False, 

846 ) 

847 

848 if request.only_sent: 848 ↛ 849line 848 didn't jump to line 849 because the condition on line 848 was never true

849 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

850 elif request.only_received: 850 ↛ 851line 850 didn't jump to line 851 because the condition on line 850 was never true

851 statement = statement.where(HostRequest.host_user_id == context.user_id) 

852 else: 

853 statement = statement.where( 

854 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

855 ) 

856 

857 statement = statement.order_by(Message.id.asc()).limit(pagination + 1) 

858 res = session.execute(statement).all() 

859 

860 no_more = len(res) <= pagination 

861 

862 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO 

863 

864 return requests_pb2.GetHostRequestUpdatesRes( 

865 no_more=no_more, 

866 updates=[ 

867 requests_pb2.HostRequestUpdate( 

868 host_request_id=result.host_request_id, 

869 status=hostrequeststatus2api[result.host_request_status], 

870 message=message_to_pb(result.Message), 

871 ) 

872 for result in res[:pagination] 

873 ], 

874 ) 

875 

876 def MarkLastSeenHostRequest( 

877 self, request: requests_pb2.MarkLastSeenHostRequestReq, context: CouchersContext, session: Session 

878 ) -> empty_pb2.Empty: 

879 host_request = session.execute( 

880 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

881 HostRequest.conversation_id == request.host_request_id 

882 ) 

883 ).scalar_one_or_none() 

884 

885 if not host_request: 885 ↛ 886line 885 didn't jump to line 886 because the condition on line 885 was never true

886 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

887 

888 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 888 ↛ 889line 888 didn't jump to line 889 because the condition on line 888 was never true

889 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

890 

891 if host_request.surfer_user_id == context.user_id: 891 ↛ 892line 891 didn't jump to line 892 because the condition on line 891 was never true

892 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id: 

893 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

894 host_request.surfer_last_seen_message_id = request.last_seen_message_id 

895 else: 

896 if not host_request.host_last_seen_message_id <= request.last_seen_message_id: 

897 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

898 host_request.host_last_seen_message_id = request.last_seen_message_id 

899 

900 session.commit() 

901 return empty_pb2.Empty() 

902 

903 def SetHostRequestArchiveStatus( 

904 self, request: requests_pb2.SetHostRequestArchiveStatusReq, context: CouchersContext, session: Session 

905 ) -> requests_pb2.SetHostRequestArchiveStatusRes: 

906 host_request = session.execute( 

907 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False) 

908 .where(HostRequest.conversation_id == request.host_request_id) 

909 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

910 ).scalar_one_or_none() 

911 

912 if not host_request: 912 ↛ 913line 912 didn't jump to line 913 because the condition on line 912 was never true

913 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

914 

915 if context.user_id == host_request.surfer_user_id: 915 ↛ 918line 915 didn't jump to line 918 because the condition on line 915 was always true

916 host_request.is_surfer_archived = request.is_archived 

917 else: 

918 host_request.is_host_archived = request.is_archived 

919 

920 return requests_pb2.SetHostRequestArchiveStatusRes( 

921 host_request_id=host_request.conversation_id, 

922 is_archived=request.is_archived, 

923 ) 

924 

925 def GetResponseRate( 

926 self, request: requests_pb2.GetResponseRateReq, context: CouchersContext, session: Session 

927 ) -> requests_pb2.GetResponseRateRes: 

928 user_res = session.execute( 

929 select(User.id, UserResponseRate) 

930 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id) 

931 .where(users_visible(context, User)) 

932 .where(User.id == request.user_id) 

933 ).one_or_none() 

934 

935 # if user doesn't exist, return None 

936 if not user_res: 

937 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

938 

939 user, response_rates = user_res 

940 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates)) # type: ignore[arg-type] 

941 

942 def SendHostRequestFeedback( 

943 self, request: requests_pb2.SendHostRequestFeedbackReq, context: CouchersContext, session: Session 

944 ) -> empty_pb2.Empty: 

945 host_request = session.execute( 

946 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False) 

947 .where(HostRequest.conversation_id == request.host_request_id) 

948 .where(HostRequest.host_user_id == context.user_id) 

949 ).scalar_one_or_none() 

950 

951 if not host_request: 

952 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

953 

954 feedback = session.execute( 

955 select(HostRequestFeedback) 

956 .where(HostRequestFeedback.host_request_id == host_request.conversation_id) 

957 .where(HostRequestFeedback.from_user_id == context.user_id) 

958 ).scalar_one_or_none() 

959 

960 if feedback: 

961 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback") 

962 

963 session.add( 

964 HostRequestFeedback( 

965 host_request_id=host_request.conversation_id, 

966 from_user_id=host_request.host_user_id, 

967 to_user_id=host_request.surfer_user_id, 

968 request_quality=hostrequestquality2sql.get(request.host_request_quality), 

969 decline_reason=request.decline_reason, 

970 ) 

971 ) 

972 quality = hostrequestquality2sql.get(request.host_request_quality) 

973 log_event( 

974 context, 

975 session, 

976 "host_request.feedback_submitted", 

977 { 

978 "host_request_id": host_request.conversation_id, 

979 "surfer_id": host_request.surfer_user_id, 

980 "host_id": host_request.host_user_id, 

981 "request_quality": quality.name if quality else None, 

982 "has_decline_reason": bool(request.decline_reason), 

983 "host_city": host_request.hosting_city, 

984 }, 

985 ) 

986 

987 return empty_pb2.Empty()