Coverage for src / couchers / servicers / requests.py: 91%

286 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-28 06:27 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import exists, select 

7from sqlalchemy.orm import Session, aliased 

8from sqlalchemy.sql import and_, func, or_ 

9 

10from couchers.constants import HOST_REQUEST_MIN_LENGTH_UTF16 

11from couchers.context import CouchersContext 

12from couchers.helpers.completed_profile import has_completed_profile 

13from couchers.materialized_views import UserResponseRate 

14from couchers.metrics import ( 

15 account_age_on_host_request_create_histogram, 

16 host_request_first_response_histogram, 

17 host_request_responses_counter, 

18 host_requests_sent_counter, 

19 sent_messages_counter, 

20) 

21from couchers.models import ( 

22 Conversation, 

23 HostRequest, 

24 HostRequestFeedback, 

25 HostRequestQuality, 

26 HostRequestStatus, 

27 Message, 

28 MessageType, 

29 ModerationObjectType, 

30 RateLimitAction, 

31 User, 

32) 

33from couchers.models.notifications import NotificationTopicAction 

34from couchers.moderation.utils import create_moderation 

35from couchers.notifications.notify import notify 

36from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc 

37from couchers.rate_limits.check import process_rate_limits_and_check_abort 

38from couchers.rate_limits.definitions import RATE_LIMIT_HOURS 

39from couchers.servicers.api import response_rate_to_pb, user_model_to_pb 

40from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible 

41from couchers.utils import ( 

42 Timestamp_from_datetime, 

43 date_to_api, 

44 get_coordinates, 

45 now, 

46 parse_date, 

47 today_in_timezone, 

48) 

49 

50logger = logging.getLogger(__name__) 

51 

52DEFAULT_PAGINATION_LENGTH = 10 

53MAX_PAGE_SIZE = 50 

54 

55 

56hostrequeststatus2api = { 

57 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING, 

58 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

59 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED, 

60 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED, 

61 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED, 

62} 

63 

64hostrequestquality2sql = { 

65 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality, 

66 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality, 

67 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality, 

68} 

69 

70 

71def message_to_pb(message: Message) -> conversations_pb2.Message: 

72 """ 

73 Turns the given message to a protocol buffer 

74 """ 

75 if message.is_normal_message: 

76 return conversations_pb2.Message( 

77 message_id=message.id, 

78 author_user_id=message.author_id, 

79 time=Timestamp_from_datetime(message.time), 

80 text=conversations_pb2.MessageContentText(text=message.text), 

81 ) 

82 else: 

83 return conversations_pb2.Message( 

84 message_id=message.id, 

85 author_user_id=message.author_id, 

86 time=Timestamp_from_datetime(message.time), 

87 chat_created=( 

88 conversations_pb2.MessageContentChatCreated() 

89 if message.message_type == MessageType.chat_created 

90 else None 

91 ), 

92 host_request_status_changed=( 

93 conversations_pb2.MessageContentHostRequestStatusChanged( 

94 status=hostrequeststatus2api[message.host_request_status_target] # type: ignore[index] 

95 ) 

96 if message.message_type == MessageType.host_request_status_changed 

97 else None 

98 ), 

99 ) 

100 

101 

102def host_request_to_pb( 

103 host_request: HostRequest, session: Session, context: CouchersContext 

104) -> requests_pb2.HostRequest: 

105 initial_message = session.execute( 

106 select(Message) 

107 .where(Message.conversation_id == host_request.conversation_id) 

108 .order_by(Message.id.asc()) 

109 .limit(1) 

110 ).scalar_one() 

111 

112 latest_message = session.execute( 

113 select(Message) 

114 .where(Message.conversation_id == host_request.conversation_id) 

115 .order_by(Message.id.desc()) 

116 .limit(1) 

117 ).scalar_one() 

118 

119 lat, lng = get_coordinates(host_request.hosting_location) 

120 

121 need_feedback = False 

122 if context.user_id == host_request.host_user_id and host_request.status == HostRequestStatus.rejected: 

123 need_feedback = not session.execute( 

124 select( 

125 exists().where( 

126 HostRequestFeedback.from_user_id == context.user_id, 

127 HostRequestFeedback.host_request_id == host_request.conversation_id, 

128 ) 

129 ) 

130 ).scalar_one() 

131 

132 return requests_pb2.HostRequest( 

133 host_request_id=host_request.conversation_id, 

134 surfer_user_id=host_request.surfer_user_id, 

135 host_user_id=host_request.host_user_id, 

136 status=hostrequeststatus2api[host_request.status], 

137 created=Timestamp_from_datetime(initial_message.time), 

138 from_date=date_to_api(host_request.from_date), 

139 to_date=date_to_api(host_request.to_date), 

140 last_seen_message_id=( 

141 host_request.surfer_last_seen_message_id 

142 if context.user_id == host_request.surfer_user_id 

143 else host_request.host_last_seen_message_id 

144 ), 

145 latest_message=message_to_pb(latest_message), 

146 hosting_city=host_request.hosting_city, 

147 hosting_lat=lat, 

148 hosting_lng=lng, 

149 hosting_radius=host_request.hosting_radius, 

150 need_host_request_feedback=need_feedback, 

151 ) 

152 

153 

154def _possibly_observe_first_response_time( 

155 session: Session, host_request: HostRequest, user_id: int, response_type: str 

156) -> None: 

157 # if this is the first response then there's nothing by this user yet 

158 assert host_request.host_user_id == user_id 

159 

160 number_messages_by_host = session.execute( 

161 select(func.count()) 

162 .where(Message.conversation_id == host_request.conversation_id) 

163 .where(Message.author_id == user_id) 

164 ).scalar_one_or_none() 

165 

166 if number_messages_by_host == 0: 

167 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one() 

168 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one() 

169 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe( 

170 (now() - host_request.conversation.created).total_seconds() 

171 ) 

172 

173 

174def _is_host_request_long_enough(text: str) -> bool: 

175 # Python's len(str) does not match Javascript's string.length. 

176 # e.g. len("é") == 2 but "é".length == 1. 

177 # To match the frontend's validation, measure the string in utf16 code units. 

178 text_length_utf16 = len(text.encode("utf-16-le")) // 2 # utf-16-le does not include a prefix BOM code unit. 

179 return text_length_utf16 >= HOST_REQUEST_MIN_LENGTH_UTF16 

180 

181 

182class Requests(requests_pb2_grpc.RequestsServicer): 

183 def CreateHostRequest( 

184 self, request: requests_pb2.CreateHostRequestReq, context: CouchersContext, session: Session 

185 ) -> requests_pb2.CreateHostRequestRes: 

186 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

187 if not has_completed_profile(session, user): 

188 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request") 

189 

190 if request.host_user_id == context.user_id: 

191 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self") 

192 

193 # just to check host exists and is visible 

194 host = session.execute( 

195 select(User).where(users_visible(context, User)).where(User.id == request.host_user_id) 

196 ).scalar_one_or_none() 

197 if not host: 

198 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

199 

200 from_date = parse_date(request.from_date) 

201 to_date = parse_date(request.to_date) 

202 

203 if not from_date or not to_date: 

204 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date") 

205 

206 today = today_in_timezone(host.timezone) 

207 

208 # request starts from the past 

209 if from_date < today: 

210 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today") 

211 

212 # from_date is not >= to_date 

213 if from_date >= to_date: 

214 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to") 

215 

216 # No need to check today > to_date 

217 

218 if from_date - today > timedelta(days=365): 

219 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year") 

220 

221 if to_date - from_date > timedelta(days=365): 

222 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year") 

223 

224 # Check minimum length 

225 if not _is_host_request_long_enough(request.text): 

226 context.abort_with_error_code( 

227 grpc.StatusCode.INVALID_ARGUMENT, 

228 "host_request_too_short", 

229 substitutions={"chars": str(HOST_REQUEST_MIN_LENGTH_UTF16)}, 

230 ) 

231 

232 # Check if user has been sending host requests excessively 

233 if process_rate_limits_and_check_abort( 

234 session=session, user_id=context.user_id, action=RateLimitAction.host_request 

235 ): 

236 context.abort_with_error_code( 

237 grpc.StatusCode.RESOURCE_EXHAUSTED, 

238 "host_request_rate_limit", 

239 substitutions={"hours": str(RATE_LIMIT_HOURS)}, 

240 ) 

241 

242 conversation = Conversation() 

243 session.add(conversation) 

244 session.flush() 

245 

246 session.add( 

247 Message( 

248 conversation_id=conversation.id, 

249 author_id=context.user_id, 

250 message_type=MessageType.chat_created, 

251 ) 

252 ) 

253 

254 message = Message( 

255 conversation_id=conversation.id, 

256 author_id=context.user_id, 

257 text=request.text, 

258 message_type=MessageType.text, 

259 ) 

260 session.add(message) 

261 session.flush() 

262 

263 # Create moderation state for UMS (starts as SHADOWED) 

264 moderation_state = create_moderation( 

265 session=session, 

266 object_type=ModerationObjectType.HOST_REQUEST, 

267 object_id=conversation.id, 

268 creator_user_id=context.user_id, 

269 ) 

270 

271 host_request = HostRequest( 

272 conversation_id=conversation.id, 

273 surfer_user_id=context.user_id, 

274 host_user_id=host.id, 

275 moderation_state_id=moderation_state.id, 

276 from_date=from_date, 

277 to_date=to_date, 

278 status=HostRequestStatus.pending, 

279 surfer_last_seen_message_id=message.id, 

280 # TODO: tz 

281 # timezone=host.timezone, 

282 hosting_city=host.city, 

283 hosting_location=host.geom, 

284 hosting_radius=host.geom_radius, 

285 ) 

286 session.add(host_request) 

287 session.flush() 

288 

289 notify( 

290 session, 

291 user_id=host_request.host_user_id, 

292 topic_action=NotificationTopicAction.host_request__create, 

293 key=str(host_request.conversation_id), 

294 data=notification_data_pb2.HostRequestCreate( 

295 host_request=host_request_to_pb(host_request, session, context), 

296 surfer=user_model_to_pb(host_request.surfer, session, context), 

297 text=request.text, 

298 ), 

299 moderation_state_id=moderation_state.id, 

300 ) 

301 

302 host_requests_sent_counter.labels(user.gender, host.gender).inc() 

303 sent_messages_counter.labels(user.gender, "host request send").inc() 

304 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe( 

305 (now() - user.joined).total_seconds() 

306 ) 

307 

308 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id) 

309 

310 def GetHostRequest( 

311 self, request: requests_pb2.GetHostRequestReq, context: CouchersContext, session: Session 

312 ) -> requests_pb2.HostRequest: 

313 host_request = session.execute( 

314 where_moderated_content_visible( 

315 where_users_column_visible( 

316 where_users_column_visible( 

317 select(HostRequest), 

318 context, 

319 HostRequest.surfer_user_id, 

320 ), 

321 context, 

322 HostRequest.host_user_id, 

323 ), 

324 context, 

325 HostRequest, 

326 is_list_operation=False, 

327 ) 

328 .where(HostRequest.conversation_id == request.host_request_id) 

329 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

330 ).scalar_one_or_none() 

331 

332 if not host_request: 

333 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

334 

335 return host_request_to_pb(host_request, session, context) 

336 

337 def ListHostRequests( 

338 self, request: requests_pb2.ListHostRequestsReq, context: CouchersContext, session: Session 

339 ) -> requests_pb2.ListHostRequestsRes: 

340 if request.only_sent and request.only_received: 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true

341 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

342 

343 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

344 pagination = min(pagination, MAX_PAGE_SIZE) 

345 

346 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have 

347 # none as message_2.id. So just filter for these to get the highest messages only. 

348 # See https://stackoverflow.com/a/27802817/6115336 

349 message_2 = aliased(Message) 

350 statement = where_moderated_content_visible( 

351 where_users_column_visible( 

352 where_users_column_visible( 

353 select(Message, HostRequest, Conversation) 

354 .outerjoin( 

355 message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id) 

356 ) 

357 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

358 .join(Conversation, Conversation.id == HostRequest.conversation_id), 

359 context, 

360 HostRequest.surfer_user_id, 

361 ), 

362 context, 

363 HostRequest.host_user_id, 

364 ), 

365 context, 

366 HostRequest, 

367 is_list_operation=True, 

368 ).where(message_2.id == None) 

369 

370 if request.last_request_id != 0: 

371 statement = statement.where(Message.id < request.last_request_id) 

372 if request.only_sent: 

373 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

374 elif request.only_received: 

375 statement = statement.where(HostRequest.host_user_id == context.user_id) 

376 elif request.HasField("only_archived"): 

377 statement = statement.where( 

378 or_( 

379 and_( 

380 HostRequest.surfer_user_id == context.user_id, 

381 HostRequest.is_surfer_archived == request.only_archived, 

382 ), 

383 and_( 

384 HostRequest.host_user_id == context.user_id, 

385 HostRequest.is_host_archived == request.only_archived, 

386 ), 

387 ) 

388 ) 

389 else: 

390 statement = statement.where( 

391 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

392 ) 

393 

394 # TODO: I considered having the latest control message be the single source of truth for 

395 # the HostRequest.status, but decided against it because of this filter. 

396 # Another possibility is to filter in the python instead of SQL, but that's slower 

397 if request.only_active: 

398 statement = statement.where( 

399 or_( 

400 HostRequest.status == HostRequestStatus.pending, 

401 HostRequest.status == HostRequestStatus.accepted, 

402 HostRequest.status == HostRequestStatus.confirmed, 

403 ) 

404 ) 

405 statement = statement.where(HostRequest.end_time <= func.now()) 

406 

407 statement = statement.order_by(Message.id.desc()).limit(pagination + 1) 

408 results = session.execute(statement).all() 

409 

410 host_requests = [] 

411 for result in results[:pagination]: 

412 lat, lng = get_coordinates(result.HostRequest.hosting_location) 

413 host_requests.append( 

414 requests_pb2.HostRequest( 

415 host_request_id=result.HostRequest.conversation_id, 

416 surfer_user_id=result.HostRequest.surfer_user_id, 

417 host_user_id=result.HostRequest.host_user_id, 

418 status=hostrequeststatus2api[result.HostRequest.status], 

419 created=Timestamp_from_datetime(result.Conversation.created), 

420 from_date=date_to_api(result.HostRequest.from_date), 

421 to_date=date_to_api(result.HostRequest.to_date), 

422 last_seen_message_id=( 

423 result.HostRequest.surfer_last_seen_message_id 

424 if context.user_id == result.HostRequest.surfer_user_id 

425 else result.HostRequest.host_last_seen_message_id 

426 ), 

427 latest_message=message_to_pb(result.Message), 

428 hosting_city=result.HostRequest.hosting_city, 

429 hosting_lat=lat, 

430 hosting_lng=lng, 

431 hosting_radius=result.HostRequest.hosting_radius, 

432 ) 

433 ) 

434 

435 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO 

436 no_more = len(results) <= pagination 

437 

438 return requests_pb2.ListHostRequestsRes( 

439 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests 

440 ) 

441 

442 def RespondHostRequest( 

443 self, request: requests_pb2.RespondHostRequestReq, context: CouchersContext, session: Session 

444 ) -> empty_pb2.Empty: 

445 def count_host_response(other_user_id: int, response_type: str) -> None: 

446 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

447 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one() 

448 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc() 

449 sent_messages_counter.labels(user_gender, "host request response").inc() 

450 

451 host_request = session.execute( 

452 where_moderated_content_visible( 

453 where_users_column_visible( 

454 where_users_column_visible( 

455 select(HostRequest), 

456 context, 

457 HostRequest.surfer_user_id, 

458 ), 

459 context, 

460 HostRequest.host_user_id, 

461 ), 

462 context, 

463 HostRequest, 

464 is_list_operation=False, 

465 ).where(HostRequest.conversation_id == request.host_request_id) 

466 ).scalar_one_or_none() 

467 

468 if not host_request: 

469 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

470 

471 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

472 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

473 

474 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING: 

475 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

476 

477 if host_request.end_time < now(): 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true

478 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past") 

479 

480 control_message = Message( 

481 message_type=MessageType.host_request_status_changed, 

482 conversation_id=host_request.conversation_id, 

483 author_id=context.user_id, 

484 ) 

485 

486 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED: 

487 # only host can accept 

488 if context.user_id != host_request.host_user_id: 

489 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host") 

490 # can't accept a cancelled or confirmed request (only reject), or already accepted 

491 if ( 491 ↛ 496line 491 didn't jump to line 496 because the condition on line 491 was never true

492 host_request.status == HostRequestStatus.cancelled 

493 or host_request.status == HostRequestStatus.confirmed 

494 or host_request.status == HostRequestStatus.accepted 

495 ): 

496 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

497 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted") 

498 control_message.host_request_status_target = HostRequestStatus.accepted 

499 host_request.status = HostRequestStatus.accepted 

500 session.flush() 

501 

502 notify( 

503 session, 

504 user_id=host_request.surfer_user_id, 

505 topic_action=NotificationTopicAction.host_request__accept, 

506 key=str(host_request.conversation_id), 

507 data=notification_data_pb2.HostRequestAccept( 

508 host_request=host_request_to_pb(host_request, session, context), 

509 host=user_model_to_pb(host_request.host, session, context), 

510 ), 

511 moderation_state_id=host_request.moderation_state_id, 

512 ) 

513 

514 count_host_response(host_request.surfer_user_id, "accepted") 

515 

516 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED: 

517 # only host can reject 

518 if context.user_id != host_request.host_user_id: 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true

519 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

520 # can't reject a cancelled or already rejected request 

521 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 521 ↛ 522line 521 didn't jump to line 522 because the condition on line 521 was never true

522 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

523 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected") 

524 control_message.host_request_status_target = HostRequestStatus.rejected 

525 host_request.status = HostRequestStatus.rejected 

526 session.flush() 

527 

528 notify( 

529 session, 

530 user_id=host_request.surfer_user_id, 

531 topic_action=NotificationTopicAction.host_request__reject, 

532 key=str(host_request.conversation_id), 

533 data=notification_data_pb2.HostRequestReject( 

534 host_request=host_request_to_pb(host_request, session, context), 

535 host=user_model_to_pb(host_request.host, session, context), 

536 ), 

537 moderation_state_id=host_request.moderation_state_id, 

538 ) 

539 

540 count_host_response(host_request.surfer_user_id, "rejected") 

541 

542 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED: 

543 # only surfer can confirm 

544 if context.user_id != host_request.surfer_user_id: 

545 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

546 # can only confirm an accepted request 

547 if host_request.status != HostRequestStatus.accepted: 

548 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

549 control_message.host_request_status_target = HostRequestStatus.confirmed 

550 host_request.status = HostRequestStatus.confirmed 

551 session.flush() 

552 

553 notify( 

554 session, 

555 user_id=host_request.host_user_id, 

556 topic_action=NotificationTopicAction.host_request__confirm, 

557 key=str(host_request.conversation_id), 

558 data=notification_data_pb2.HostRequestConfirm( 

559 host_request=host_request_to_pb(host_request, session, context), 

560 surfer=user_model_to_pb(host_request.surfer, session, context), 

561 ), 

562 moderation_state_id=host_request.moderation_state_id, 

563 ) 

564 

565 count_host_response(host_request.host_user_id, "confirmed") 

566 

567 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED: 

568 # only surfer can cancel 

569 if context.user_id != host_request.surfer_user_id: 

570 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

571 # can't' cancel an already cancelled or rejected request 

572 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 572 ↛ 573line 572 didn't jump to line 573 because the condition on line 572 was never true

573 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

574 control_message.host_request_status_target = HostRequestStatus.cancelled 

575 host_request.status = HostRequestStatus.cancelled 

576 session.flush() 

577 

578 notify( 

579 session, 

580 user_id=host_request.host_user_id, 

581 topic_action=NotificationTopicAction.host_request__cancel, 

582 key=str(host_request.conversation_id), 

583 data=notification_data_pb2.HostRequestCancel( 

584 host_request=host_request_to_pb(host_request, session, context), 

585 surfer=user_model_to_pb(host_request.surfer, session, context), 

586 ), 

587 moderation_state_id=host_request.moderation_state_id, 

588 ) 

589 

590 count_host_response(host_request.host_user_id, "cancelled") 

591 

592 session.add(control_message) 

593 

594 if request.text: 

595 latest_message = Message( 

596 conversation_id=host_request.conversation_id, 

597 text=request.text, 

598 author_id=context.user_id, 

599 message_type=MessageType.text, 

600 ) 

601 

602 session.add(latest_message) 

603 else: 

604 latest_message = control_message 

605 

606 session.flush() 

607 

608 if host_request.surfer_user_id == context.user_id: 

609 host_request.surfer_last_seen_message_id = latest_message.id 

610 else: 

611 host_request.host_last_seen_message_id = latest_message.id 

612 session.commit() 

613 

614 return empty_pb2.Empty() 

615 

616 def GetHostRequestMessages( 

617 self, request: requests_pb2.GetHostRequestMessagesReq, context: CouchersContext, session: Session 

618 ) -> requests_pb2.GetHostRequestMessagesRes: 

619 host_request = session.execute( 

620 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

621 HostRequest.conversation_id == request.host_request_id 

622 ) 

623 ).scalar_one_or_none() 

624 

625 if not host_request: 625 ↛ 626line 625 didn't jump to line 626 because the condition on line 625 was never true

626 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

627 

628 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 628 ↛ 629line 628 didn't jump to line 629 because the condition on line 628 was never true

629 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

630 

631 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

632 pagination = min(pagination, MAX_PAGE_SIZE) 

633 

634 messages = ( 

635 session.execute( 

636 select(Message) 

637 .where(Message.conversation_id == host_request.conversation_id) 

638 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0))) 

639 .order_by(Message.id.desc()) 

640 .limit(pagination + 1) 

641 ) 

642 .scalars() 

643 .all() 

644 ) 

645 

646 no_more = len(messages) <= pagination 

647 

648 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0 

649 

650 return requests_pb2.GetHostRequestMessagesRes( 

651 last_message_id=last_message_id, 

652 no_more=no_more, 

653 messages=[message_to_pb(message) for message in messages[:pagination]], 

654 ) 

655 

656 def SendHostRequestMessage( 

657 self, request: requests_pb2.SendHostRequestMessageReq, context: CouchersContext, session: Session 

658 ) -> empty_pb2.Empty: 

659 if request.text == "": 

660 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

661 host_request = session.execute( 

662 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

663 HostRequest.conversation_id == request.host_request_id 

664 ) 

665 ).scalar_one_or_none() 

666 

667 if not host_request: 

668 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

669 

670 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

671 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

672 

673 if host_request.host_user_id == context.user_id: 

674 _possibly_observe_first_response_time(session, host_request, context.user_id, "message") 

675 

676 message = Message( 

677 conversation_id=host_request.conversation_id, 

678 author_id=context.user_id, 

679 message_type=MessageType.text, 

680 text=request.text, 

681 ) 

682 

683 session.add(message) 

684 session.flush() 

685 

686 if host_request.surfer_user_id == context.user_id: 

687 host_request.surfer_last_seen_message_id = message.id 

688 

689 notify( 

690 session, 

691 user_id=host_request.host_user_id, 

692 topic_action=NotificationTopicAction.host_request__message, 

693 key=str(host_request.conversation_id), 

694 data=notification_data_pb2.HostRequestMessage( 

695 host_request=host_request_to_pb(host_request, session, context), 

696 user=user_model_to_pb(host_request.surfer, session, context), 

697 text=request.text, 

698 am_host=True, 

699 ), 

700 moderation_state_id=host_request.moderation_state_id, 

701 ) 

702 

703 else: 

704 host_request.host_last_seen_message_id = message.id 

705 

706 notify( 

707 session, 

708 user_id=host_request.surfer_user_id, 

709 topic_action=NotificationTopicAction.host_request__message, 

710 key=str(host_request.conversation_id), 

711 data=notification_data_pb2.HostRequestMessage( 

712 host_request=host_request_to_pb(host_request, session, context), 

713 user=user_model_to_pb(host_request.host, session, context), 

714 text=request.text, 

715 am_host=False, 

716 ), 

717 moderation_state_id=host_request.moderation_state_id, 

718 ) 

719 

720 session.commit() 

721 

722 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

723 sent_messages_counter.labels(user_gender, "host request").inc() 

724 

725 return empty_pb2.Empty() 

726 

727 def GetHostRequestUpdates( 

728 self, request: requests_pb2.GetHostRequestUpdatesReq, context: CouchersContext, session: Session 

729 ) -> requests_pb2.GetHostRequestUpdatesRes: 

730 if request.only_sent and request.only_received: 730 ↛ 731line 730 didn't jump to line 731 because the condition on line 730 was never true

731 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

732 

733 if request.newest_message_id == 0: 

734 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

735 

736 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 736 ↛ 737line 736 didn't jump to line 737 because the condition on line 736 was never true

737 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

738 

739 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

740 pagination = min(pagination, MAX_PAGE_SIZE) 

741 

742 statement = where_moderated_content_visible( 

743 select( 

744 Message, 

745 HostRequest.status.label("host_request_status"), 

746 HostRequest.conversation_id.label("host_request_id"), 

747 ) 

748 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

749 .where(Message.id > request.newest_message_id), 

750 context, 

751 HostRequest, 

752 is_list_operation=False, 

753 ) 

754 

755 if request.only_sent: 755 ↛ 756line 755 didn't jump to line 756 because the condition on line 755 was never true

756 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

757 elif request.only_received: 757 ↛ 758line 757 didn't jump to line 758 because the condition on line 757 was never true

758 statement = statement.where(HostRequest.host_user_id == context.user_id) 

759 else: 

760 statement = statement.where( 

761 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

762 ) 

763 

764 statement = statement.order_by(Message.id.asc()).limit(pagination + 1) 

765 res = session.execute(statement).all() 

766 

767 no_more = len(res) <= pagination 

768 

769 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO 

770 

771 return requests_pb2.GetHostRequestUpdatesRes( 

772 no_more=no_more, 

773 updates=[ 

774 requests_pb2.HostRequestUpdate( 

775 host_request_id=result.host_request_id, 

776 status=hostrequeststatus2api[result.host_request_status], 

777 message=message_to_pb(result.Message), 

778 ) 

779 for result in res[:pagination] 

780 ], 

781 ) 

782 

783 def MarkLastSeenHostRequest( 

784 self, request: requests_pb2.MarkLastSeenHostRequestReq, context: CouchersContext, session: Session 

785 ) -> empty_pb2.Empty: 

786 host_request = session.execute( 

787 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

788 HostRequest.conversation_id == request.host_request_id 

789 ) 

790 ).scalar_one_or_none() 

791 

792 if not host_request: 792 ↛ 793line 792 didn't jump to line 793 because the condition on line 792 was never true

793 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

794 

795 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 795 ↛ 796line 795 didn't jump to line 796 because the condition on line 795 was never true

796 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

797 

798 if host_request.surfer_user_id == context.user_id: 798 ↛ 799line 798 didn't jump to line 799 because the condition on line 798 was never true

799 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id: 

800 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

801 host_request.surfer_last_seen_message_id = request.last_seen_message_id 

802 else: 

803 if not host_request.host_last_seen_message_id <= request.last_seen_message_id: 

804 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

805 host_request.host_last_seen_message_id = request.last_seen_message_id 

806 

807 session.commit() 

808 return empty_pb2.Empty() 

809 

810 def SetHostRequestArchiveStatus( 

811 self, request: requests_pb2.SetHostRequestArchiveStatusReq, context: CouchersContext, session: Session 

812 ) -> requests_pb2.SetHostRequestArchiveStatusRes: 

813 host_request = session.execute( 

814 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False) 

815 .where(HostRequest.conversation_id == request.host_request_id) 

816 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

817 ).scalar_one_or_none() 

818 

819 if not host_request: 819 ↛ 820line 819 didn't jump to line 820 because the condition on line 819 was never true

820 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

821 

822 if context.user_id == host_request.surfer_user_id: 822 ↛ 825line 822 didn't jump to line 825 because the condition on line 822 was always true

823 host_request.is_surfer_archived = request.is_archived 

824 else: 

825 host_request.is_host_archived = request.is_archived 

826 

827 return requests_pb2.SetHostRequestArchiveStatusRes( 

828 host_request_id=host_request.conversation_id, 

829 is_archived=request.is_archived, 

830 ) 

831 

832 def GetResponseRate( 

833 self, request: requests_pb2.GetResponseRateReq, context: CouchersContext, session: Session 

834 ) -> requests_pb2.GetResponseRateRes: 

835 user_res = session.execute( 

836 select(User.id, UserResponseRate) 

837 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id) 

838 .where(users_visible(context, User)) 

839 .where(User.id == request.user_id) 

840 ).one_or_none() 

841 

842 # if user doesn't exist, return None 

843 if not user_res: 

844 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

845 

846 user, response_rates = user_res 

847 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates)) # type: ignore[arg-type] 

848 

849 def SendHostRequestFeedback( 

850 self, request: requests_pb2.SendHostRequestFeedbackReq, context: CouchersContext, session: Session 

851 ) -> empty_pb2.Empty: 

852 host_request = session.execute( 

853 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False) 

854 .where(HostRequest.conversation_id == request.host_request_id) 

855 .where(HostRequest.host_user_id == context.user_id) 

856 ).scalar_one_or_none() 

857 

858 if not host_request: 

859 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

860 

861 feedback = session.execute( 

862 select(HostRequestFeedback) 

863 .where(HostRequestFeedback.host_request_id == host_request.conversation_id) 

864 .where(HostRequestFeedback.from_user_id == context.user_id) 

865 ).scalar_one_or_none() 

866 

867 if feedback: 

868 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback") 

869 

870 session.add( 

871 HostRequestFeedback( 

872 host_request_id=host_request.conversation_id, 

873 from_user_id=host_request.host_user_id, 

874 to_user_id=host_request.surfer_user_id, 

875 request_quality=hostrequestquality2sql.get(request.host_request_quality), 

876 decline_reason=request.decline_reason, 

877 ) 

878 ) 

879 

880 return empty_pb2.Empty()