Coverage for src / couchers / servicers / requests.py: 91%

285 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-14 09:03 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import exists, select 

7from sqlalchemy.orm import Session, aliased 

8from sqlalchemy.sql import and_, func, or_ 

9 

10from couchers.constants import HOST_REQUEST_MIN_LENGTH_UTF16 

11from couchers.context import CouchersContext 

12from couchers.materialized_views import UserResponseRate 

13from couchers.metrics import ( 

14 account_age_on_host_request_create_histogram, 

15 host_request_first_response_histogram, 

16 host_request_responses_counter, 

17 host_requests_sent_counter, 

18 sent_messages_counter, 

19) 

20from couchers.models import ( 

21 Conversation, 

22 HostRequest, 

23 HostRequestFeedback, 

24 HostRequestQuality, 

25 HostRequestStatus, 

26 Message, 

27 MessageType, 

28 ModerationObjectType, 

29 RateLimitAction, 

30 User, 

31) 

32from couchers.models.notifications import NotificationTopicAction 

33from couchers.moderation.utils import create_moderation 

34from couchers.notifications.notify import notify 

35from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc 

36from couchers.rate_limits.check import process_rate_limits_and_check_abort 

37from couchers.rate_limits.definitions import RATE_LIMIT_HOURS 

38from couchers.servicers.api import response_rate_to_pb, user_model_to_pb 

39from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible 

40from couchers.utils import ( 

41 Timestamp_from_datetime, 

42 date_to_api, 

43 get_coordinates, 

44 now, 

45 parse_date, 

46 today_in_timezone, 

47) 

48 

49logger = logging.getLogger(__name__) 

50 

51DEFAULT_PAGINATION_LENGTH = 10 

52MAX_PAGE_SIZE = 50 

53 

54 

55hostrequeststatus2api = { 

56 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING, 

57 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

58 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED, 

59 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED, 

60 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED, 

61} 

62 

63hostrequestquality2sql = { 

64 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality, 

65 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality, 

66 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality, 

67} 

68 

69 

70def message_to_pb(message: Message) -> conversations_pb2.Message: 

71 """ 

72 Turns the given message to a protocol buffer 

73 """ 

74 if message.is_normal_message: 

75 return conversations_pb2.Message( 

76 message_id=message.id, 

77 author_user_id=message.author_id, 

78 time=Timestamp_from_datetime(message.time), 

79 text=conversations_pb2.MessageContentText(text=message.text), 

80 ) 

81 else: 

82 return conversations_pb2.Message( 

83 message_id=message.id, 

84 author_user_id=message.author_id, 

85 time=Timestamp_from_datetime(message.time), 

86 chat_created=( 

87 conversations_pb2.MessageContentChatCreated() 

88 if message.message_type == MessageType.chat_created 

89 else None 

90 ), 

91 host_request_status_changed=( 

92 conversations_pb2.MessageContentHostRequestStatusChanged( 

93 status=hostrequeststatus2api[message.host_request_status_target] # type: ignore[index] 

94 ) 

95 if message.message_type == MessageType.host_request_status_changed 

96 else None 

97 ), 

98 ) 

99 

100 

101def host_request_to_pb( 

102 host_request: HostRequest, session: Session, context: CouchersContext 

103) -> requests_pb2.HostRequest: 

104 initial_message = session.execute( 

105 select(Message) 

106 .where(Message.conversation_id == host_request.conversation_id) 

107 .order_by(Message.id.asc()) 

108 .limit(1) 

109 ).scalar_one() 

110 

111 latest_message = session.execute( 

112 select(Message) 

113 .where(Message.conversation_id == host_request.conversation_id) 

114 .order_by(Message.id.desc()) 

115 .limit(1) 

116 ).scalar_one() 

117 

118 lat, lng = get_coordinates(host_request.hosting_location) 

119 

120 need_feedback = False 

121 if context.user_id == host_request.host_user_id and host_request.status == HostRequestStatus.rejected: 

122 need_feedback = not session.execute( 

123 select( 

124 exists().where( 

125 HostRequestFeedback.from_user_id == context.user_id, 

126 HostRequestFeedback.host_request_id == host_request.conversation_id, 

127 ) 

128 ) 

129 ).scalar_one() 

130 

131 return requests_pb2.HostRequest( 

132 host_request_id=host_request.conversation_id, 

133 surfer_user_id=host_request.surfer_user_id, 

134 host_user_id=host_request.host_user_id, 

135 status=hostrequeststatus2api[host_request.status], 

136 created=Timestamp_from_datetime(initial_message.time), 

137 from_date=date_to_api(host_request.from_date), 

138 to_date=date_to_api(host_request.to_date), 

139 last_seen_message_id=( 

140 host_request.surfer_last_seen_message_id 

141 if context.user_id == host_request.surfer_user_id 

142 else host_request.host_last_seen_message_id 

143 ), 

144 latest_message=message_to_pb(latest_message), 

145 hosting_city=host_request.hosting_city, 

146 hosting_lat=lat, 

147 hosting_lng=lng, 

148 hosting_radius=host_request.hosting_radius, 

149 need_host_request_feedback=need_feedback, 

150 ) 

151 

152 

153def _possibly_observe_first_response_time( 

154 session: Session, host_request: HostRequest, user_id: int, response_type: str 

155) -> None: 

156 # if this is the first response then there's nothing by this user yet 

157 assert host_request.host_user_id == user_id 

158 

159 number_messages_by_host = session.execute( 

160 select(func.count()) 

161 .where(Message.conversation_id == host_request.conversation_id) 

162 .where(Message.author_id == user_id) 

163 ).scalar_one_or_none() 

164 

165 if number_messages_by_host == 0: 

166 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one() 

167 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one() 

168 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe( 

169 (now() - host_request.conversation.created).total_seconds() 

170 ) 

171 

172 

173def _is_host_request_long_enough(text: str) -> bool: 

174 # Python's len(str) does not match Javascript's string.length. 

175 # e.g. len("é") == 2 but "é".length == 1. 

176 # To match the frontend's validation, measure the string in utf16 code units. 

177 text_length_utf16 = len(text.encode("utf-16-le")) // 2 # utf-16-le does not include a prefix BOM code unit. 

178 return text_length_utf16 >= HOST_REQUEST_MIN_LENGTH_UTF16 

179 

180 

181class Requests(requests_pb2_grpc.RequestsServicer): 

182 def CreateHostRequest( 

183 self, request: requests_pb2.CreateHostRequestReq, context: CouchersContext, session: Session 

184 ) -> requests_pb2.CreateHostRequestRes: 

185 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

186 if not user.has_completed_profile: 

187 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request") 

188 

189 if request.host_user_id == context.user_id: 

190 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self") 

191 

192 # just to check host exists and is visible 

193 host = session.execute( 

194 select(User).where(users_visible(context, User)).where(User.id == request.host_user_id) 

195 ).scalar_one_or_none() 

196 if not host: 

197 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

198 

199 from_date = parse_date(request.from_date) 

200 to_date = parse_date(request.to_date) 

201 

202 if not from_date or not to_date: 

203 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date") 

204 

205 today = today_in_timezone(host.timezone) 

206 

207 # request starts from the past 

208 if from_date < today: 

209 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today") 

210 

211 # from_date is not >= to_date 

212 if from_date >= to_date: 

213 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to") 

214 

215 # No need to check today > to_date 

216 

217 if from_date - today > timedelta(days=365): 

218 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year") 

219 

220 if to_date - from_date > timedelta(days=365): 

221 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year") 

222 

223 # Check minimum length 

224 if not _is_host_request_long_enough(request.text): 

225 context.abort_with_error_code( 

226 grpc.StatusCode.INVALID_ARGUMENT, 

227 "host_request_too_short", 

228 substitutions={"chars": str(HOST_REQUEST_MIN_LENGTH_UTF16)}, 

229 ) 

230 

231 # Check if user has been sending host requests excessively 

232 if process_rate_limits_and_check_abort( 

233 session=session, user_id=context.user_id, action=RateLimitAction.host_request 

234 ): 

235 context.abort_with_error_code( 

236 grpc.StatusCode.RESOURCE_EXHAUSTED, 

237 "host_request_rate_limit", 

238 substitutions={"hours": str(RATE_LIMIT_HOURS)}, 

239 ) 

240 

241 conversation = Conversation() 

242 session.add(conversation) 

243 session.flush() 

244 

245 session.add( 

246 Message( 

247 conversation_id=conversation.id, 

248 author_id=context.user_id, 

249 message_type=MessageType.chat_created, 

250 ) 

251 ) 

252 

253 message = Message( 

254 conversation_id=conversation.id, 

255 author_id=context.user_id, 

256 text=request.text, 

257 message_type=MessageType.text, 

258 ) 

259 session.add(message) 

260 session.flush() 

261 

262 # Create moderation state for UMS (starts as SHADOWED) 

263 moderation_state = create_moderation( 

264 session=session, 

265 object_type=ModerationObjectType.HOST_REQUEST, 

266 object_id=conversation.id, 

267 creator_user_id=context.user_id, 

268 ) 

269 

270 host_request = HostRequest( 

271 conversation_id=conversation.id, 

272 surfer_user_id=context.user_id, 

273 host_user_id=host.id, 

274 moderation_state_id=moderation_state.id, 

275 from_date=from_date, 

276 to_date=to_date, 

277 status=HostRequestStatus.pending, 

278 surfer_last_seen_message_id=message.id, 

279 # TODO: tz 

280 # timezone=host.timezone, 

281 hosting_city=host.city, 

282 hosting_location=host.geom, 

283 hosting_radius=host.geom_radius, 

284 ) 

285 session.add(host_request) 

286 session.flush() 

287 

288 notify( 

289 session, 

290 user_id=host_request.host_user_id, 

291 topic_action=NotificationTopicAction.host_request__create, 

292 key=str(host_request.conversation_id), 

293 data=notification_data_pb2.HostRequestCreate( 

294 host_request=host_request_to_pb(host_request, session, context), 

295 surfer=user_model_to_pb(host_request.surfer, session, context), 

296 text=request.text, 

297 ), 

298 moderation_state_id=moderation_state.id, 

299 ) 

300 

301 host_requests_sent_counter.labels(user.gender, host.gender).inc() 

302 sent_messages_counter.labels(user.gender, "host request send").inc() 

303 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe( 

304 (now() - user.joined).total_seconds() 

305 ) 

306 

307 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id) 

308 

309 def GetHostRequest( 

310 self, request: requests_pb2.GetHostRequestReq, context: CouchersContext, session: Session 

311 ) -> requests_pb2.HostRequest: 

312 host_request = session.execute( 

313 where_moderated_content_visible( 

314 where_users_column_visible( 

315 where_users_column_visible( 

316 select(HostRequest), 

317 context, 

318 HostRequest.surfer_user_id, 

319 ), 

320 context, 

321 HostRequest.host_user_id, 

322 ), 

323 context, 

324 HostRequest, 

325 is_list_operation=False, 

326 ) 

327 .where(HostRequest.conversation_id == request.host_request_id) 

328 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

329 ).scalar_one_or_none() 

330 

331 if not host_request: 

332 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

333 

334 return host_request_to_pb(host_request, session, context) 

335 

336 def ListHostRequests( 

337 self, request: requests_pb2.ListHostRequestsReq, context: CouchersContext, session: Session 

338 ) -> requests_pb2.ListHostRequestsRes: 

339 if request.only_sent and request.only_received: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true

340 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

341 

342 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

343 pagination = min(pagination, MAX_PAGE_SIZE) 

344 

345 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have 

346 # none as message_2.id. So just filter for these to get the highest messages only. 

347 # See https://stackoverflow.com/a/27802817/6115336 

348 message_2 = aliased(Message) 

349 statement = where_moderated_content_visible( 

350 where_users_column_visible( 

351 where_users_column_visible( 

352 select(Message, HostRequest, Conversation) 

353 .outerjoin( 

354 message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id) 

355 ) 

356 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

357 .join(Conversation, Conversation.id == HostRequest.conversation_id), 

358 context, 

359 HostRequest.surfer_user_id, 

360 ), 

361 context, 

362 HostRequest.host_user_id, 

363 ), 

364 context, 

365 HostRequest, 

366 is_list_operation=True, 

367 ).where(message_2.id == None) 

368 

369 if request.last_request_id != 0: 

370 statement = statement.where(Message.id < request.last_request_id) 

371 if request.only_sent: 

372 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

373 elif request.only_received: 

374 statement = statement.where(HostRequest.host_user_id == context.user_id) 

375 elif request.HasField("only_archived"): 

376 statement = statement.where( 

377 or_( 

378 and_( 

379 HostRequest.surfer_user_id == context.user_id, 

380 HostRequest.is_surfer_archived == request.only_archived, 

381 ), 

382 and_( 

383 HostRequest.host_user_id == context.user_id, 

384 HostRequest.is_host_archived == request.only_archived, 

385 ), 

386 ) 

387 ) 

388 else: 

389 statement = statement.where( 

390 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

391 ) 

392 

393 # TODO: I considered having the latest control message be the single source of truth for 

394 # the HostRequest.status, but decided against it because of this filter. 

395 # Another possibility is to filter in the python instead of SQL, but that's slower 

396 if request.only_active: 

397 statement = statement.where( 

398 or_( 

399 HostRequest.status == HostRequestStatus.pending, 

400 HostRequest.status == HostRequestStatus.accepted, 

401 HostRequest.status == HostRequestStatus.confirmed, 

402 ) 

403 ) 

404 statement = statement.where(HostRequest.end_time <= func.now()) 

405 

406 statement = statement.order_by(Message.id.desc()).limit(pagination + 1) 

407 results = session.execute(statement).all() 

408 

409 host_requests = [] 

410 for result in results[:pagination]: 

411 lat, lng = get_coordinates(result.HostRequest.hosting_location) 

412 host_requests.append( 

413 requests_pb2.HostRequest( 

414 host_request_id=result.HostRequest.conversation_id, 

415 surfer_user_id=result.HostRequest.surfer_user_id, 

416 host_user_id=result.HostRequest.host_user_id, 

417 status=hostrequeststatus2api[result.HostRequest.status], 

418 created=Timestamp_from_datetime(result.Conversation.created), 

419 from_date=date_to_api(result.HostRequest.from_date), 

420 to_date=date_to_api(result.HostRequest.to_date), 

421 last_seen_message_id=( 

422 result.HostRequest.surfer_last_seen_message_id 

423 if context.user_id == result.HostRequest.surfer_user_id 

424 else result.HostRequest.host_last_seen_message_id 

425 ), 

426 latest_message=message_to_pb(result.Message), 

427 hosting_city=result.HostRequest.hosting_city, 

428 hosting_lat=lat, 

429 hosting_lng=lng, 

430 hosting_radius=result.HostRequest.hosting_radius, 

431 ) 

432 ) 

433 

434 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO 

435 no_more = len(results) <= pagination 

436 

437 return requests_pb2.ListHostRequestsRes( 

438 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests 

439 ) 

440 

441 def RespondHostRequest( 

442 self, request: requests_pb2.RespondHostRequestReq, context: CouchersContext, session: Session 

443 ) -> empty_pb2.Empty: 

444 def count_host_response(other_user_id: int, response_type: str) -> None: 

445 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

446 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one() 

447 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc() 

448 sent_messages_counter.labels(user_gender, "host request response").inc() 

449 

450 host_request = session.execute( 

451 where_moderated_content_visible( 

452 where_users_column_visible( 

453 where_users_column_visible( 

454 select(HostRequest), 

455 context, 

456 HostRequest.surfer_user_id, 

457 ), 

458 context, 

459 HostRequest.host_user_id, 

460 ), 

461 context, 

462 HostRequest, 

463 is_list_operation=False, 

464 ).where(HostRequest.conversation_id == request.host_request_id) 

465 ).scalar_one_or_none() 

466 

467 if not host_request: 

468 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

469 

470 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

471 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

472 

473 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING: 

474 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

475 

476 if host_request.end_time < now(): 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true

477 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past") 

478 

479 control_message = Message( 

480 message_type=MessageType.host_request_status_changed, 

481 conversation_id=host_request.conversation_id, 

482 author_id=context.user_id, 

483 ) 

484 

485 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED: 

486 # only host can accept 

487 if context.user_id != host_request.host_user_id: 

488 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host") 

489 # can't accept a cancelled or confirmed request (only reject), or already accepted 

490 if ( 490 ↛ 495line 490 didn't jump to line 495 because the condition on line 490 was never true

491 host_request.status == HostRequestStatus.cancelled 

492 or host_request.status == HostRequestStatus.confirmed 

493 or host_request.status == HostRequestStatus.accepted 

494 ): 

495 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

496 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted") 

497 control_message.host_request_status_target = HostRequestStatus.accepted 

498 host_request.status = HostRequestStatus.accepted 

499 session.flush() 

500 

501 notify( 

502 session, 

503 user_id=host_request.surfer_user_id, 

504 topic_action=NotificationTopicAction.host_request__accept, 

505 key=str(host_request.conversation_id), 

506 data=notification_data_pb2.HostRequestAccept( 

507 host_request=host_request_to_pb(host_request, session, context), 

508 host=user_model_to_pb(host_request.host, session, context), 

509 ), 

510 moderation_state_id=host_request.moderation_state_id, 

511 ) 

512 

513 count_host_response(host_request.surfer_user_id, "accepted") 

514 

515 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED: 

516 # only host can reject 

517 if context.user_id != host_request.host_user_id: 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true

518 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

519 # can't reject a cancelled or already rejected request 

520 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true

521 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

522 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected") 

523 control_message.host_request_status_target = HostRequestStatus.rejected 

524 host_request.status = HostRequestStatus.rejected 

525 session.flush() 

526 

527 notify( 

528 session, 

529 user_id=host_request.surfer_user_id, 

530 topic_action=NotificationTopicAction.host_request__reject, 

531 key=str(host_request.conversation_id), 

532 data=notification_data_pb2.HostRequestReject( 

533 host_request=host_request_to_pb(host_request, session, context), 

534 host=user_model_to_pb(host_request.host, session, context), 

535 ), 

536 moderation_state_id=host_request.moderation_state_id, 

537 ) 

538 

539 count_host_response(host_request.surfer_user_id, "rejected") 

540 

541 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED: 

542 # only surfer can confirm 

543 if context.user_id != host_request.surfer_user_id: 

544 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

545 # can only confirm an accepted request 

546 if host_request.status != HostRequestStatus.accepted: 

547 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

548 control_message.host_request_status_target = HostRequestStatus.confirmed 

549 host_request.status = HostRequestStatus.confirmed 

550 session.flush() 

551 

552 notify( 

553 session, 

554 user_id=host_request.host_user_id, 

555 topic_action=NotificationTopicAction.host_request__confirm, 

556 key=str(host_request.conversation_id), 

557 data=notification_data_pb2.HostRequestConfirm( 

558 host_request=host_request_to_pb(host_request, session, context), 

559 surfer=user_model_to_pb(host_request.surfer, session, context), 

560 ), 

561 moderation_state_id=host_request.moderation_state_id, 

562 ) 

563 

564 count_host_response(host_request.host_user_id, "confirmed") 

565 

566 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED: 

567 # only surfer can cancel 

568 if context.user_id != host_request.surfer_user_id: 

569 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

570 # can't' cancel an already cancelled or rejected request 

571 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 571 ↛ 572line 571 didn't jump to line 572 because the condition on line 571 was never true

572 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

573 control_message.host_request_status_target = HostRequestStatus.cancelled 

574 host_request.status = HostRequestStatus.cancelled 

575 session.flush() 

576 

577 notify( 

578 session, 

579 user_id=host_request.host_user_id, 

580 topic_action=NotificationTopicAction.host_request__cancel, 

581 key=str(host_request.conversation_id), 

582 data=notification_data_pb2.HostRequestCancel( 

583 host_request=host_request_to_pb(host_request, session, context), 

584 surfer=user_model_to_pb(host_request.surfer, session, context), 

585 ), 

586 moderation_state_id=host_request.moderation_state_id, 

587 ) 

588 

589 count_host_response(host_request.host_user_id, "cancelled") 

590 

591 session.add(control_message) 

592 

593 if request.text: 

594 latest_message = Message( 

595 conversation_id=host_request.conversation_id, 

596 text=request.text, 

597 author_id=context.user_id, 

598 message_type=MessageType.text, 

599 ) 

600 

601 session.add(latest_message) 

602 else: 

603 latest_message = control_message 

604 

605 session.flush() 

606 

607 if host_request.surfer_user_id == context.user_id: 

608 host_request.surfer_last_seen_message_id = latest_message.id 

609 else: 

610 host_request.host_last_seen_message_id = latest_message.id 

611 session.commit() 

612 

613 return empty_pb2.Empty() 

614 

615 def GetHostRequestMessages( 

616 self, request: requests_pb2.GetHostRequestMessagesReq, context: CouchersContext, session: Session 

617 ) -> requests_pb2.GetHostRequestMessagesRes: 

618 host_request = session.execute( 

619 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

620 HostRequest.conversation_id == request.host_request_id 

621 ) 

622 ).scalar_one_or_none() 

623 

624 if not host_request: 624 ↛ 625line 624 didn't jump to line 625 because the condition on line 624 was never true

625 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

626 

627 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 627 ↛ 628line 627 didn't jump to line 628 because the condition on line 627 was never true

628 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

629 

630 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

631 pagination = min(pagination, MAX_PAGE_SIZE) 

632 

633 messages = ( 

634 session.execute( 

635 select(Message) 

636 .where(Message.conversation_id == host_request.conversation_id) 

637 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0))) 

638 .order_by(Message.id.desc()) 

639 .limit(pagination + 1) 

640 ) 

641 .scalars() 

642 .all() 

643 ) 

644 

645 no_more = len(messages) <= pagination 

646 

647 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0 

648 

649 return requests_pb2.GetHostRequestMessagesRes( 

650 last_message_id=last_message_id, 

651 no_more=no_more, 

652 messages=[message_to_pb(message) for message in messages[:pagination]], 

653 ) 

654 

655 def SendHostRequestMessage( 

656 self, request: requests_pb2.SendHostRequestMessageReq, context: CouchersContext, session: Session 

657 ) -> empty_pb2.Empty: 

658 if request.text == "": 

659 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

660 host_request = session.execute( 

661 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

662 HostRequest.conversation_id == request.host_request_id 

663 ) 

664 ).scalar_one_or_none() 

665 

666 if not host_request: 

667 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

668 

669 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

670 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

671 

672 if host_request.host_user_id == context.user_id: 

673 _possibly_observe_first_response_time(session, host_request, context.user_id, "message") 

674 

675 message = Message( 

676 conversation_id=host_request.conversation_id, 

677 author_id=context.user_id, 

678 message_type=MessageType.text, 

679 text=request.text, 

680 ) 

681 

682 session.add(message) 

683 session.flush() 

684 

685 if host_request.surfer_user_id == context.user_id: 

686 host_request.surfer_last_seen_message_id = message.id 

687 

688 notify( 

689 session, 

690 user_id=host_request.host_user_id, 

691 topic_action=NotificationTopicAction.host_request__message, 

692 key=str(host_request.conversation_id), 

693 data=notification_data_pb2.HostRequestMessage( 

694 host_request=host_request_to_pb(host_request, session, context), 

695 user=user_model_to_pb(host_request.surfer, session, context), 

696 text=request.text, 

697 am_host=True, 

698 ), 

699 moderation_state_id=host_request.moderation_state_id, 

700 ) 

701 

702 else: 

703 host_request.host_last_seen_message_id = message.id 

704 

705 notify( 

706 session, 

707 user_id=host_request.surfer_user_id, 

708 topic_action=NotificationTopicAction.host_request__message, 

709 key=str(host_request.conversation_id), 

710 data=notification_data_pb2.HostRequestMessage( 

711 host_request=host_request_to_pb(host_request, session, context), 

712 user=user_model_to_pb(host_request.host, session, context), 

713 text=request.text, 

714 am_host=False, 

715 ), 

716 moderation_state_id=host_request.moderation_state_id, 

717 ) 

718 

719 session.commit() 

720 

721 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

722 sent_messages_counter.labels(user_gender, "host request").inc() 

723 

724 return empty_pb2.Empty() 

725 

726 def GetHostRequestUpdates( 

727 self, request: requests_pb2.GetHostRequestUpdatesReq, context: CouchersContext, session: Session 

728 ) -> requests_pb2.GetHostRequestUpdatesRes: 

729 if request.only_sent and request.only_received: 729 ↛ 730line 729 didn't jump to line 730 because the condition on line 729 was never true

730 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

731 

732 if request.newest_message_id == 0: 

733 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

734 

735 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 735 ↛ 736line 735 didn't jump to line 736 because the condition on line 735 was never true

736 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

737 

738 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

739 pagination = min(pagination, MAX_PAGE_SIZE) 

740 

741 statement = where_moderated_content_visible( 

742 select( 

743 Message, 

744 HostRequest.status.label("host_request_status"), 

745 HostRequest.conversation_id.label("host_request_id"), 

746 ) 

747 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

748 .where(Message.id > request.newest_message_id), 

749 context, 

750 HostRequest, 

751 is_list_operation=False, 

752 ) 

753 

754 if request.only_sent: 754 ↛ 755line 754 didn't jump to line 755 because the condition on line 754 was never true

755 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

756 elif request.only_received: 756 ↛ 757line 756 didn't jump to line 757 because the condition on line 756 was never true

757 statement = statement.where(HostRequest.host_user_id == context.user_id) 

758 else: 

759 statement = statement.where( 

760 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

761 ) 

762 

763 statement = statement.order_by(Message.id.asc()).limit(pagination + 1) 

764 res = session.execute(statement).all() 

765 

766 no_more = len(res) <= pagination 

767 

768 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO 

769 

770 return requests_pb2.GetHostRequestUpdatesRes( 

771 no_more=no_more, 

772 updates=[ 

773 requests_pb2.HostRequestUpdate( 

774 host_request_id=result.host_request_id, 

775 status=hostrequeststatus2api[result.host_request_status], 

776 message=message_to_pb(result.Message), 

777 ) 

778 for result in res[:pagination] 

779 ], 

780 ) 

781 

782 def MarkLastSeenHostRequest( 

783 self, request: requests_pb2.MarkLastSeenHostRequestReq, context: CouchersContext, session: Session 

784 ) -> empty_pb2.Empty: 

785 host_request = session.execute( 

786 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

787 HostRequest.conversation_id == request.host_request_id 

788 ) 

789 ).scalar_one_or_none() 

790 

791 if not host_request: 791 ↛ 792line 791 didn't jump to line 792 because the condition on line 791 was never true

792 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

793 

794 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 794 ↛ 795line 794 didn't jump to line 795 because the condition on line 794 was never true

795 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

796 

797 if host_request.surfer_user_id == context.user_id: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true

798 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id: 

799 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

800 host_request.surfer_last_seen_message_id = request.last_seen_message_id 

801 else: 

802 if not host_request.host_last_seen_message_id <= request.last_seen_message_id: 

803 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

804 host_request.host_last_seen_message_id = request.last_seen_message_id 

805 

806 session.commit() 

807 return empty_pb2.Empty() 

808 

809 def SetHostRequestArchiveStatus( 

810 self, request: requests_pb2.SetHostRequestArchiveStatusReq, context: CouchersContext, session: Session 

811 ) -> requests_pb2.SetHostRequestArchiveStatusRes: 

812 host_request = session.execute( 

813 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False) 

814 .where(HostRequest.conversation_id == request.host_request_id) 

815 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

816 ).scalar_one_or_none() 

817 

818 if not host_request: 818 ↛ 819line 818 didn't jump to line 819 because the condition on line 818 was never true

819 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

820 

821 if context.user_id == host_request.surfer_user_id: 821 ↛ 824line 821 didn't jump to line 824 because the condition on line 821 was always true

822 host_request.is_surfer_archived = request.is_archived 

823 else: 

824 host_request.is_host_archived = request.is_archived 

825 

826 return requests_pb2.SetHostRequestArchiveStatusRes( 

827 host_request_id=host_request.conversation_id, 

828 is_archived=request.is_archived, 

829 ) 

830 

831 def GetResponseRate( 

832 self, request: requests_pb2.GetResponseRateReq, context: CouchersContext, session: Session 

833 ) -> requests_pb2.GetResponseRateRes: 

834 user_res = session.execute( 

835 select(User.id, UserResponseRate) 

836 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id) 

837 .where(users_visible(context, User)) 

838 .where(User.id == request.user_id) 

839 ).one_or_none() 

840 

841 # if user doesn't exist, return None 

842 if not user_res: 

843 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

844 

845 user, response_rates = user_res 

846 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates)) # type: ignore[arg-type] 

847 

848 def SendHostRequestFeedback( 

849 self, request: requests_pb2.SendHostRequestFeedbackReq, context: CouchersContext, session: Session 

850 ) -> empty_pb2.Empty: 

851 host_request = session.execute( 

852 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False) 

853 .where(HostRequest.conversation_id == request.host_request_id) 

854 .where(HostRequest.host_user_id == context.user_id) 

855 ).scalar_one_or_none() 

856 

857 if not host_request: 

858 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

859 

860 feedback = session.execute( 

861 select(HostRequestFeedback) 

862 .where(HostRequestFeedback.host_request_id == host_request.conversation_id) 

863 .where(HostRequestFeedback.from_user_id == context.user_id) 

864 ).scalar_one_or_none() 

865 

866 if feedback: 

867 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback") 

868 

869 session.add( 

870 HostRequestFeedback( 

871 host_request_id=host_request.conversation_id, 

872 from_user_id=host_request.host_user_id, 

873 to_user_id=host_request.surfer_user_id, 

874 request_quality=hostrequestquality2sql.get(request.host_request_quality), 

875 decline_reason=request.decline_reason, 

876 ) 

877 ) 

878 

879 return empty_pb2.Empty()