Coverage for src / couchers / servicers / requests.py: 91%

295 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-07 11:26 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import exists, select 

7from sqlalchemy.orm import Session, aliased 

8from sqlalchemy.sql import and_, func, or_ 

9 

10from couchers.constants import HOST_REQUEST_MIN_LENGTH_UTF16 

11from couchers.context import CouchersContext 

12from couchers.materialized_views import UserResponseRate 

13from couchers.metrics import ( 

14 account_age_on_host_request_create_histogram, 

15 host_request_first_response_histogram, 

16 host_request_responses_counter, 

17 host_requests_sent_counter, 

18 sent_messages_counter, 

19) 

20from couchers.models import ( 

21 Conversation, 

22 HostRequest, 

23 HostRequestFeedback, 

24 HostRequestQuality, 

25 HostRequestStatus, 

26 Message, 

27 MessageType, 

28 ModerationObjectType, 

29 RateLimitAction, 

30 User, 

31) 

32from couchers.moderation.utils import create_moderation 

33from couchers.notifications.notify import notify 

34from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc 

35from couchers.rate_limits.check import process_rate_limits_and_check_abort 

36from couchers.rate_limits.definitions import RATE_LIMIT_HOURS 

37from couchers.servicers.api import response_rate_to_pb, user_model_to_pb 

38from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible 

39from couchers.utils import ( 

40 Timestamp_from_datetime, 

41 date_to_api, 

42 get_coordinates, 

43 now, 

44 parse_date, 

45 today_in_timezone, 

46) 

47 

48logger = logging.getLogger(__name__) 

49 

50DEFAULT_PAGINATION_LENGTH = 10 

51MAX_PAGE_SIZE = 50 

52 

53 

54hostrequeststatus2api = { 

55 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING, 

56 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

57 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED, 

58 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED, 

59 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED, 

60} 

61 

62hostrequestquality2sql = { 

63 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality, 

64 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality, 

65 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality, 

66} 

67 

68 

69def message_to_pb(message: Message) -> conversations_pb2.Message: 

70 """ 

71 Turns the given message to a protocol buffer 

72 """ 

73 if message.is_normal_message: 

74 return conversations_pb2.Message( 

75 message_id=message.id, 

76 author_user_id=message.author_id, 

77 time=Timestamp_from_datetime(message.time), 

78 text=conversations_pb2.MessageContentText(text=message.text), 

79 ) 

80 else: 

81 return conversations_pb2.Message( 

82 message_id=message.id, 

83 author_user_id=message.author_id, 

84 time=Timestamp_from_datetime(message.time), 

85 chat_created=( 

86 conversations_pb2.MessageContentChatCreated() 

87 if message.message_type == MessageType.chat_created 

88 else None 

89 ), 

90 host_request_status_changed=( 

91 conversations_pb2.MessageContentHostRequestStatusChanged( 

92 status=hostrequeststatus2api[message.host_request_status_target] # type: ignore[index] 

93 ) 

94 if message.message_type == MessageType.host_request_status_changed 

95 else None 

96 ), 

97 ) 

98 

99 

100def host_request_to_pb( 

101 host_request: HostRequest, session: Session, context: CouchersContext 

102) -> requests_pb2.HostRequest: 

103 initial_message = session.execute( 

104 select(Message) 

105 .where(Message.conversation_id == host_request.conversation_id) 

106 .order_by(Message.id.asc()) 

107 .limit(1) 

108 ).scalar_one() 

109 

110 latest_message = session.execute( 

111 select(Message) 

112 .where(Message.conversation_id == host_request.conversation_id) 

113 .order_by(Message.id.desc()) 

114 .limit(1) 

115 ).scalar_one() 

116 

117 lat, lng = get_coordinates(host_request.hosting_location) 

118 

119 need_feedback = False 

120 if context.user_id == host_request.host_user_id and host_request.status == HostRequestStatus.rejected: 

121 need_feedback = not session.execute( 

122 select( 

123 exists().where( 

124 HostRequestFeedback.from_user_id == context.user_id, 

125 HostRequestFeedback.host_request_id == host_request.conversation_id, 

126 ) 

127 ) 

128 ).scalar_one() 

129 

130 return requests_pb2.HostRequest( 

131 host_request_id=host_request.conversation_id, 

132 surfer_user_id=host_request.surfer_user_id, 

133 host_user_id=host_request.host_user_id, 

134 status=hostrequeststatus2api[host_request.status], 

135 created=Timestamp_from_datetime(initial_message.time), 

136 from_date=date_to_api(host_request.from_date), 

137 to_date=date_to_api(host_request.to_date), 

138 last_seen_message_id=( 

139 host_request.surfer_last_seen_message_id 

140 if context.user_id == host_request.surfer_user_id 

141 else host_request.host_last_seen_message_id 

142 ), 

143 latest_message=message_to_pb(latest_message), 

144 hosting_city=host_request.hosting_city, 

145 hosting_lat=lat, 

146 hosting_lng=lng, 

147 hosting_radius=host_request.hosting_radius, 

148 need_host_request_feedback=need_feedback, 

149 ) 

150 

151 

152def _possibly_observe_first_response_time( 

153 session: Session, host_request: HostRequest, user_id: int, response_type: str 

154) -> None: 

155 # if this is the first response then there's nothing by this user yet 

156 assert host_request.host_user_id == user_id 

157 

158 number_messages_by_host = session.execute( 

159 select(func.count()) 

160 .where(Message.conversation_id == host_request.conversation_id) 

161 .where(Message.author_id == user_id) 

162 ).scalar_one_or_none() 

163 

164 if number_messages_by_host == 0: 

165 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one() 

166 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one() 

167 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe( 

168 (now() - host_request.conversation.created).total_seconds() 

169 ) 

170 

171 

172def _is_host_request_long_enough(text: str) -> bool: 

173 # Python's len(str) does not match Javascript's string.length. 

174 # e.g. len("é") == 2 but "é".length == 1. 

175 # To match the frontend's validation, measure the string in utf16 code units. 

176 text_length_utf16 = len(text.encode("utf-16-le")) // 2 # utf-16-le does not include a prefix BOM code unit. 

177 return text_length_utf16 >= HOST_REQUEST_MIN_LENGTH_UTF16 

178 

179 

180class Requests(requests_pb2_grpc.RequestsServicer): 

181 def CreateHostRequest( 

182 self, request: requests_pb2.CreateHostRequestReq, context: CouchersContext, session: Session 

183 ) -> requests_pb2.CreateHostRequestRes: 

184 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

185 if not user.has_completed_profile: 

186 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request") 

187 

188 if request.host_user_id == context.user_id: 

189 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self") 

190 

191 # just to check host exists and is visible 

192 host = session.execute( 

193 select(User).where(users_visible(context, User)).where(User.id == request.host_user_id) 

194 ).scalar_one_or_none() 

195 if not host: 

196 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

197 

198 from_date = parse_date(request.from_date) 

199 to_date = parse_date(request.to_date) 

200 

201 if not from_date or not to_date: 

202 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date") 

203 

204 today = today_in_timezone(host.timezone) 

205 

206 # request starts from the past 

207 if from_date < today: 

208 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today") 

209 

210 # from_date is not >= to_date 

211 if from_date >= to_date: 

212 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to") 

213 

214 # No need to check today > to_date 

215 

216 if from_date - today > timedelta(days=365): 

217 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year") 

218 

219 if to_date - from_date > timedelta(days=365): 

220 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year") 

221 

222 # Check minimum length 

223 if not _is_host_request_long_enough(request.text): 

224 context.abort_with_error_code( 

225 grpc.StatusCode.INVALID_ARGUMENT, 

226 "host_request_too_short", 

227 substitutions={"chars": str(HOST_REQUEST_MIN_LENGTH_UTF16)}, 

228 ) 

229 

230 # Check if user has been sending host requests excessively 

231 if process_rate_limits_and_check_abort( 

232 session=session, user_id=context.user_id, action=RateLimitAction.host_request 

233 ): 

234 context.abort_with_error_code( 

235 grpc.StatusCode.RESOURCE_EXHAUSTED, 

236 "host_request_rate_limit", 

237 substitutions={"hours": str(RATE_LIMIT_HOURS)}, 

238 ) 

239 

240 conversation = Conversation() 

241 session.add(conversation) 

242 session.flush() 

243 

244 session.add( 

245 Message( 

246 conversation_id=conversation.id, 

247 author_id=context.user_id, 

248 message_type=MessageType.chat_created, 

249 ) 

250 ) 

251 

252 message = Message( 

253 conversation_id=conversation.id, 

254 author_id=context.user_id, 

255 text=request.text, 

256 message_type=MessageType.text, 

257 ) 

258 session.add(message) 

259 session.flush() 

260 

261 # Create moderation state for UMS (starts as SHADOWED) 

262 moderation_state = create_moderation( 

263 session=session, 

264 object_type=ModerationObjectType.HOST_REQUEST, 

265 object_id=conversation.id, 

266 creator_user_id=context.user_id, 

267 ) 

268 

269 host_request = HostRequest( 

270 conversation_id=conversation.id, 

271 surfer_user_id=context.user_id, 

272 host_user_id=host.id, 

273 moderation_state_id=moderation_state.id, 

274 from_date=from_date, 

275 to_date=to_date, 

276 status=HostRequestStatus.pending, 

277 surfer_last_seen_message_id=message.id, 

278 # TODO: tz 

279 # timezone=host.timezone, 

280 hosting_city=host.city, 

281 hosting_location=host.geom, 

282 hosting_radius=host.geom_radius, 

283 ) 

284 session.add(host_request) 

285 session.flush() 

286 

287 notify( 

288 session, 

289 user_id=host_request.host_user_id, 

290 topic_action="host_request:create", 

291 key=str(host_request.conversation_id), 

292 data=notification_data_pb2.HostRequestCreate( 

293 host_request=host_request_to_pb(host_request, session, context), 

294 surfer=user_model_to_pb(host_request.surfer, session, context), 

295 text=request.text, 

296 ), 

297 moderation_state_id=moderation_state.id, 

298 ) 

299 

300 host_requests_sent_counter.labels(user.gender, host.gender).inc() 

301 sent_messages_counter.labels(user.gender, "host request send").inc() 

302 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe( 

303 (now() - user.joined).total_seconds() 

304 ) 

305 

306 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id) 

307 

308 def GetHostRequest( 

309 self, request: requests_pb2.GetHostRequestReq, context: CouchersContext, session: Session 

310 ) -> requests_pb2.HostRequest: 

311 host_request = session.execute( 

312 where_moderated_content_visible( 

313 where_users_column_visible( 

314 where_users_column_visible( 

315 select(HostRequest), 

316 context, 

317 HostRequest.surfer_user_id, 

318 ), 

319 context, 

320 HostRequest.host_user_id, 

321 ), 

322 context, 

323 HostRequest, 

324 is_list_operation=False, 

325 ) 

326 .where(HostRequest.conversation_id == request.host_request_id) 

327 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

328 ).scalar_one_or_none() 

329 

330 if not host_request: 

331 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

332 

333 return host_request_to_pb(host_request, session, context) 

334 

335 def ListHostRequests( 

336 self, request: requests_pb2.ListHostRequestsReq, context: CouchersContext, session: Session 

337 ) -> requests_pb2.ListHostRequestsRes: 

338 if request.only_sent and request.only_received: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true

339 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

340 

341 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

342 pagination = min(pagination, MAX_PAGE_SIZE) 

343 

344 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have 

345 # none as message_2.id. So just filter for these to get the highest messages only. 

346 # See https://stackoverflow.com/a/27802817/6115336 

347 message_2 = aliased(Message) 

348 statement = where_moderated_content_visible( 

349 where_users_column_visible( 

350 where_users_column_visible( 

351 select(Message, HostRequest, Conversation) 

352 .outerjoin( 

353 message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id) 

354 ) 

355 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

356 .join(Conversation, Conversation.id == HostRequest.conversation_id), 

357 context, 

358 HostRequest.surfer_user_id, 

359 ), 

360 context, 

361 HostRequest.host_user_id, 

362 ), 

363 context, 

364 HostRequest, 

365 is_list_operation=True, 

366 ).where(message_2.id == None) 

367 

368 if request.last_request_id != 0: 

369 statement = statement.where(Message.id < request.last_request_id) 

370 if request.only_sent: 

371 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

372 elif request.only_received: 

373 statement = statement.where(HostRequest.host_user_id == context.user_id) 

374 elif request.HasField("only_archived"): 

375 statement = statement.where( 

376 or_( 

377 and_( 

378 HostRequest.surfer_user_id == context.user_id, 

379 HostRequest.is_surfer_archived == request.only_archived, 

380 ), 

381 and_( 

382 HostRequest.host_user_id == context.user_id, 

383 HostRequest.is_host_archived == request.only_archived, 

384 ), 

385 ) 

386 ) 

387 else: 

388 statement = statement.where( 

389 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

390 ) 

391 

392 # TODO: I considered having the latest control message be the single source of truth for 

393 # the HostRequest.status, but decided against it because of this filter. 

394 # Another possibility is to filter in the python instead of SQL, but that's slower 

395 if request.only_active: 

396 statement = statement.where( 

397 or_( 

398 HostRequest.status == HostRequestStatus.pending, 

399 HostRequest.status == HostRequestStatus.accepted, 

400 HostRequest.status == HostRequestStatus.confirmed, 

401 ) 

402 ) 

403 statement = statement.where(HostRequest.end_time <= func.now()) 

404 

405 statement = statement.order_by(Message.id.desc()).limit(pagination + 1) 

406 results = session.execute(statement).all() 

407 

408 host_requests = [] 

409 for result in results[:pagination]: 

410 lat, lng = get_coordinates(result.HostRequest.hosting_location) 

411 host_requests.append( 

412 requests_pb2.HostRequest( 

413 host_request_id=result.HostRequest.conversation_id, 

414 surfer_user_id=result.HostRequest.surfer_user_id, 

415 host_user_id=result.HostRequest.host_user_id, 

416 status=hostrequeststatus2api[result.HostRequest.status], 

417 created=Timestamp_from_datetime(result.Conversation.created), 

418 from_date=date_to_api(result.HostRequest.from_date), 

419 to_date=date_to_api(result.HostRequest.to_date), 

420 last_seen_message_id=( 

421 result.HostRequest.surfer_last_seen_message_id 

422 if context.user_id == result.HostRequest.surfer_user_id 

423 else result.HostRequest.host_last_seen_message_id 

424 ), 

425 latest_message=message_to_pb(result.Message), 

426 hosting_city=result.HostRequest.hosting_city, 

427 hosting_lat=lat, 

428 hosting_lng=lng, 

429 hosting_radius=result.HostRequest.hosting_radius, 

430 ) 

431 ) 

432 

433 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO 

434 no_more = len(results) <= pagination 

435 

436 return requests_pb2.ListHostRequestsRes( 

437 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests 

438 ) 

439 

440 def RespondHostRequest( 

441 self, request: requests_pb2.RespondHostRequestReq, context: CouchersContext, session: Session 

442 ) -> empty_pb2.Empty: 

443 def count_host_response(other_user_id: int, response_type: str) -> None: 

444 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

445 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one() 

446 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc() 

447 sent_messages_counter.labels(user_gender, "host request response").inc() 

448 

449 host_request = session.execute( 

450 where_moderated_content_visible( 

451 where_users_column_visible( 

452 where_users_column_visible( 

453 select(HostRequest), 

454 context, 

455 HostRequest.surfer_user_id, 

456 ), 

457 context, 

458 HostRequest.host_user_id, 

459 ), 

460 context, 

461 HostRequest, 

462 is_list_operation=False, 

463 ).where(HostRequest.conversation_id == request.host_request_id) 

464 ).scalar_one_or_none() 

465 

466 if not host_request: 

467 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

468 

469 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

470 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

471 

472 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING: 

473 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

474 

475 if host_request.end_time < now(): 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true

476 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past") 

477 

478 control_message = Message() 

479 

480 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED: 

481 # only host can accept 

482 if context.user_id != host_request.host_user_id: 

483 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host") 

484 # can't accept a cancelled or confirmed request (only reject), or already accepted 

485 if ( 485 ↛ 490line 485 didn't jump to line 490 because the condition on line 485 was never true

486 host_request.status == HostRequestStatus.cancelled 

487 or host_request.status == HostRequestStatus.confirmed 

488 or host_request.status == HostRequestStatus.accepted 

489 ): 

490 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

491 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted") 

492 control_message.host_request_status_target = HostRequestStatus.accepted 

493 host_request.status = HostRequestStatus.accepted 

494 session.flush() 

495 

496 notify( 

497 session, 

498 user_id=host_request.surfer_user_id, 

499 topic_action="host_request:accept", 

500 key=str(host_request.conversation_id), 

501 data=notification_data_pb2.HostRequestAccept( 

502 host_request=host_request_to_pb(host_request, session, context), 

503 host=user_model_to_pb(host_request.host, session, context), 

504 ), 

505 moderation_state_id=host_request.moderation_state_id, 

506 ) 

507 

508 count_host_response(host_request.surfer_user_id, "accepted") 

509 

510 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED: 

511 # only host can reject 

512 if context.user_id != host_request.host_user_id: 512 ↛ 513line 512 didn't jump to line 513 because the condition on line 512 was never true

513 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

514 # can't reject a cancelled or already rejected request 

515 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true

516 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

517 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected") 

518 control_message.host_request_status_target = HostRequestStatus.rejected 

519 host_request.status = HostRequestStatus.rejected 

520 session.flush() 

521 

522 notify( 

523 session, 

524 user_id=host_request.surfer_user_id, 

525 topic_action="host_request:reject", 

526 key=str(host_request.conversation_id), 

527 data=notification_data_pb2.HostRequestReject( 

528 host_request=host_request_to_pb(host_request, session, context), 

529 host=user_model_to_pb(host_request.host, session, context), 

530 ), 

531 moderation_state_id=host_request.moderation_state_id, 

532 ) 

533 

534 count_host_response(host_request.surfer_user_id, "rejected") 

535 

536 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED: 

537 # only surfer can confirm 

538 if context.user_id != host_request.surfer_user_id: 

539 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

540 # can only confirm an accepted request 

541 if host_request.status != HostRequestStatus.accepted: 

542 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

543 control_message.host_request_status_target = HostRequestStatus.confirmed 

544 host_request.status = HostRequestStatus.confirmed 

545 session.flush() 

546 

547 notify( 

548 session, 

549 user_id=host_request.host_user_id, 

550 topic_action="host_request:confirm", 

551 key=str(host_request.conversation_id), 

552 data=notification_data_pb2.HostRequestConfirm( 

553 host_request=host_request_to_pb(host_request, session, context), 

554 surfer=user_model_to_pb(host_request.surfer, session, context), 

555 ), 

556 moderation_state_id=host_request.moderation_state_id, 

557 ) 

558 

559 count_host_response(host_request.host_user_id, "confirmed") 

560 

561 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED: 

562 # only surfer can cancel 

563 if context.user_id != host_request.surfer_user_id: 

564 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

565 # can't' cancel an already cancelled or rejected request 

566 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 566 ↛ 567line 566 didn't jump to line 567 because the condition on line 566 was never true

567 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

568 control_message.host_request_status_target = HostRequestStatus.cancelled 

569 host_request.status = HostRequestStatus.cancelled 

570 session.flush() 

571 

572 notify( 

573 session, 

574 user_id=host_request.host_user_id, 

575 topic_action="host_request:cancel", 

576 key=str(host_request.conversation_id), 

577 data=notification_data_pb2.HostRequestCancel( 

578 host_request=host_request_to_pb(host_request, session, context), 

579 surfer=user_model_to_pb(host_request.surfer, session, context), 

580 ), 

581 moderation_state_id=host_request.moderation_state_id, 

582 ) 

583 

584 count_host_response(host_request.host_user_id, "cancelled") 

585 

586 control_message.message_type = MessageType.host_request_status_changed 

587 control_message.conversation_id = host_request.conversation_id 

588 control_message.author_id = context.user_id 

589 session.add(control_message) 

590 

591 if request.text: 

592 latest_message = Message() 

593 latest_message.conversation_id = host_request.conversation_id 

594 latest_message.text = request.text 

595 latest_message.author_id = context.user_id 

596 latest_message.message_type = MessageType.text 

597 session.add(latest_message) 

598 else: 

599 latest_message = control_message 

600 

601 session.flush() 

602 

603 if host_request.surfer_user_id == context.user_id: 

604 host_request.surfer_last_seen_message_id = latest_message.id 

605 else: 

606 host_request.host_last_seen_message_id = latest_message.id 

607 session.commit() 

608 

609 return empty_pb2.Empty() 

610 

611 def GetHostRequestMessages( 

612 self, request: requests_pb2.GetHostRequestMessagesReq, context: CouchersContext, session: Session 

613 ) -> requests_pb2.GetHostRequestMessagesRes: 

614 host_request = session.execute( 

615 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

616 HostRequest.conversation_id == request.host_request_id 

617 ) 

618 ).scalar_one_or_none() 

619 

620 if not host_request: 620 ↛ 621line 620 didn't jump to line 621 because the condition on line 620 was never true

621 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

622 

623 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 623 ↛ 624line 623 didn't jump to line 624 because the condition on line 623 was never true

624 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

625 

626 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

627 pagination = min(pagination, MAX_PAGE_SIZE) 

628 

629 messages = ( 

630 session.execute( 

631 select(Message) 

632 .where(Message.conversation_id == host_request.conversation_id) 

633 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0))) 

634 .order_by(Message.id.desc()) 

635 .limit(pagination + 1) 

636 ) 

637 .scalars() 

638 .all() 

639 ) 

640 

641 no_more = len(messages) <= pagination 

642 

643 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0 

644 

645 return requests_pb2.GetHostRequestMessagesRes( 

646 last_message_id=last_message_id, 

647 no_more=no_more, 

648 messages=[message_to_pb(message) for message in messages[:pagination]], 

649 ) 

650 

651 def SendHostRequestMessage( 

652 self, request: requests_pb2.SendHostRequestMessageReq, context: CouchersContext, session: Session 

653 ) -> empty_pb2.Empty: 

654 if request.text == "": 

655 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

656 host_request = session.execute( 

657 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

658 HostRequest.conversation_id == request.host_request_id 

659 ) 

660 ).scalar_one_or_none() 

661 

662 if not host_request: 

663 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

664 

665 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

666 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

667 

668 if host_request.host_user_id == context.user_id: 

669 _possibly_observe_first_response_time(session, host_request, context.user_id, "message") 

670 

671 message = Message() 

672 message.conversation_id = host_request.conversation_id 

673 message.author_id = context.user_id 

674 message.message_type = MessageType.text 

675 message.text = request.text 

676 session.add(message) 

677 session.flush() 

678 

679 if host_request.surfer_user_id == context.user_id: 

680 host_request.surfer_last_seen_message_id = message.id 

681 

682 notify( 

683 session, 

684 user_id=host_request.host_user_id, 

685 topic_action="host_request:message", 

686 key=str(host_request.conversation_id), 

687 data=notification_data_pb2.HostRequestMessage( 

688 host_request=host_request_to_pb(host_request, session, context), 

689 user=user_model_to_pb(host_request.surfer, session, context), 

690 text=request.text, 

691 am_host=True, 

692 ), 

693 moderation_state_id=host_request.moderation_state_id, 

694 ) 

695 

696 else: 

697 host_request.host_last_seen_message_id = message.id 

698 

699 notify( 

700 session, 

701 user_id=host_request.surfer_user_id, 

702 topic_action="host_request:message", 

703 key=str(host_request.conversation_id), 

704 data=notification_data_pb2.HostRequestMessage( 

705 host_request=host_request_to_pb(host_request, session, context), 

706 user=user_model_to_pb(host_request.host, session, context), 

707 text=request.text, 

708 am_host=False, 

709 ), 

710 moderation_state_id=host_request.moderation_state_id, 

711 ) 

712 

713 session.commit() 

714 

715 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

716 sent_messages_counter.labels(user_gender, "host request").inc() 

717 

718 return empty_pb2.Empty() 

719 

720 def GetHostRequestUpdates( 

721 self, request: requests_pb2.GetHostRequestUpdatesReq, context: CouchersContext, session: Session 

722 ) -> requests_pb2.GetHostRequestUpdatesRes: 

723 if request.only_sent and request.only_received: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true

724 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

725 

726 if request.newest_message_id == 0: 

727 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

728 

729 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 729 ↛ 730line 729 didn't jump to line 730 because the condition on line 729 was never true

730 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

731 

732 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

733 pagination = min(pagination, MAX_PAGE_SIZE) 

734 

735 statement = where_moderated_content_visible( 

736 select( 

737 Message, 

738 HostRequest.status.label("host_request_status"), 

739 HostRequest.conversation_id.label("host_request_id"), 

740 ) 

741 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

742 .where(Message.id > request.newest_message_id), 

743 context, 

744 HostRequest, 

745 is_list_operation=False, 

746 ) 

747 

748 if request.only_sent: 748 ↛ 749line 748 didn't jump to line 749 because the condition on line 748 was never true

749 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

750 elif request.only_received: 750 ↛ 751line 750 didn't jump to line 751 because the condition on line 750 was never true

751 statement = statement.where(HostRequest.host_user_id == context.user_id) 

752 else: 

753 statement = statement.where( 

754 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

755 ) 

756 

757 statement = statement.order_by(Message.id.asc()).limit(pagination + 1) 

758 res = session.execute(statement).all() 

759 

760 no_more = len(res) <= pagination 

761 

762 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO 

763 

764 return requests_pb2.GetHostRequestUpdatesRes( 

765 no_more=no_more, 

766 updates=[ 

767 requests_pb2.HostRequestUpdate( 

768 host_request_id=result.host_request_id, 

769 status=hostrequeststatus2api[result.host_request_status], 

770 message=message_to_pb(result.Message), 

771 ) 

772 for result in res[:pagination] 

773 ], 

774 ) 

775 

776 def MarkLastSeenHostRequest( 

777 self, request: requests_pb2.MarkLastSeenHostRequestReq, context: CouchersContext, session: Session 

778 ) -> empty_pb2.Empty: 

779 host_request = session.execute( 

780 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where( 

781 HostRequest.conversation_id == request.host_request_id 

782 ) 

783 ).scalar_one_or_none() 

784 

785 if not host_request: 785 ↛ 786line 785 didn't jump to line 786 because the condition on line 785 was never true

786 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

787 

788 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 788 ↛ 789line 788 didn't jump to line 789 because the condition on line 788 was never true

789 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

790 

791 if host_request.surfer_user_id == context.user_id: 791 ↛ 792line 791 didn't jump to line 792 because the condition on line 791 was never true

792 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id: 

793 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

794 host_request.surfer_last_seen_message_id = request.last_seen_message_id 

795 else: 

796 if not host_request.host_last_seen_message_id <= request.last_seen_message_id: 

797 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

798 host_request.host_last_seen_message_id = request.last_seen_message_id 

799 

800 session.commit() 

801 return empty_pb2.Empty() 

802 

803 def SetHostRequestArchiveStatus( 

804 self, request: requests_pb2.SetHostRequestArchiveStatusReq, context: CouchersContext, session: Session 

805 ) -> requests_pb2.SetHostRequestArchiveStatusRes: 

806 host_request = session.execute( 

807 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False) 

808 .where(HostRequest.conversation_id == request.host_request_id) 

809 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

810 ).scalar_one_or_none() 

811 

812 if not host_request: 812 ↛ 813line 812 didn't jump to line 813 because the condition on line 812 was never true

813 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

814 

815 if context.user_id == host_request.surfer_user_id: 815 ↛ 818line 815 didn't jump to line 818 because the condition on line 815 was always true

816 host_request.is_surfer_archived = request.is_archived 

817 else: 

818 host_request.is_host_archived = request.is_archived 

819 

820 return requests_pb2.SetHostRequestArchiveStatusRes( 

821 host_request_id=host_request.conversation_id, 

822 is_archived=request.is_archived, 

823 ) 

824 

825 def GetResponseRate( 

826 self, request: requests_pb2.GetResponseRateReq, context: CouchersContext, session: Session 

827 ) -> requests_pb2.GetResponseRateRes: 

828 user_res = session.execute( 

829 select(User.id, UserResponseRate) 

830 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id) 

831 .where(users_visible(context, User)) 

832 .where(User.id == request.user_id) 

833 ).one_or_none() 

834 

835 # if user doesn't exist, return None 

836 if not user_res: 

837 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

838 

839 user, response_rates = user_res 

840 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates)) # type: ignore[arg-type] 

841 

842 def SendHostRequestFeedback( 

843 self, request: requests_pb2.SendHostRequestFeedbackReq, context: CouchersContext, session: Session 

844 ) -> empty_pb2.Empty: 

845 host_request = session.execute( 

846 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False) 

847 .where(HostRequest.conversation_id == request.host_request_id) 

848 .where(HostRequest.host_user_id == context.user_id) 

849 ).scalar_one_or_none() 

850 

851 if not host_request: 

852 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

853 

854 feedback = session.execute( 

855 select(HostRequestFeedback) 

856 .where(HostRequestFeedback.host_request_id == host_request.conversation_id) 

857 .where(HostRequestFeedback.from_user_id == context.user_id) 

858 ).scalar_one_or_none() 

859 

860 if feedback: 

861 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback") 

862 

863 session.add( 

864 HostRequestFeedback( 

865 host_request_id=host_request.conversation_id, 

866 from_user_id=host_request.host_user_id, 

867 to_user_id=host_request.surfer_user_id, 

868 request_quality=hostrequestquality2sql.get(request.host_request_quality), 

869 decline_reason=request.decline_reason, 

870 ) 

871 ) 

872 

873 return empty_pb2.Empty()