Coverage for src/couchers/servicers/requests.py: 93%

284 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-08-28 14:55 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import exists 

7from sqlalchemy.orm import aliased 

8from sqlalchemy.sql import and_, func, or_ 

9 

10from couchers import errors 

11from couchers.materialized_views import UserResponseRate 

12from couchers.metrics import ( 

13 account_age_on_host_request_create_histogram, 

14 host_request_first_response_histogram, 

15 host_request_responses_counter, 

16 host_requests_sent_counter, 

17 sent_messages_counter, 

18) 

19from couchers.models import ( 

20 Conversation, 

21 HostRequest, 

22 HostRequestFeedback, 

23 HostRequestQuality, 

24 HostRequestStatus, 

25 Message, 

26 MessageType, 

27 RateLimitAction, 

28 User, 

29) 

30from couchers.notifications.notify import notify 

31from couchers.rate_limits.check import process_rate_limits_and_check_abort 

32from couchers.servicers.api import response_rate_to_pb, user_model_to_pb 

33from couchers.sql import couchers_select as select 

34from couchers.utils import ( 

35 Timestamp_from_datetime, 

36 date_to_api, 

37 get_coordinates, 

38 now, 

39 parse_date, 

40 today_in_timezone, 

41) 

42from proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc 

43 

44logger = logging.getLogger(__name__) 

45 

46DEFAULT_PAGINATION_LENGTH = 10 

47MAX_PAGE_SIZE = 50 

48 

49 

50hostrequeststatus2api = { 

51 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING, 

52 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

53 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED, 

54 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED, 

55 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED, 

56} 

57 

58hostrequestquality2sql = { 

59 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality, 

60 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality, 

61 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality, 

62} 

63 

64 

65def message_to_pb(message: Message): 

66 """ 

67 Turns the given message to a protocol buffer 

68 """ 

69 if message.is_normal_message: 

70 return conversations_pb2.Message( 

71 message_id=message.id, 

72 author_user_id=message.author_id, 

73 time=Timestamp_from_datetime(message.time), 

74 text=conversations_pb2.MessageContentText(text=message.text), 

75 ) 

76 else: 

77 return conversations_pb2.Message( 

78 message_id=message.id, 

79 author_user_id=message.author_id, 

80 time=Timestamp_from_datetime(message.time), 

81 chat_created=( 

82 conversations_pb2.MessageContentChatCreated() 

83 if message.message_type == MessageType.chat_created 

84 else None 

85 ), 

86 host_request_status_changed=( 

87 conversations_pb2.MessageContentHostRequestStatusChanged( 

88 status=hostrequeststatus2api[message.host_request_status_target] 

89 ) 

90 if message.message_type == MessageType.host_request_status_changed 

91 else None 

92 ), 

93 ) 

94 

95 

96def host_request_to_pb(host_request: HostRequest, session, context): 

97 initial_message = session.execute( 

98 select(Message) 

99 .where(Message.conversation_id == host_request.conversation_id) 

100 .order_by(Message.id.asc()) 

101 .limit(1) 

102 ).scalar_one() 

103 

104 latest_message = session.execute( 

105 select(Message) 

106 .where(Message.conversation_id == host_request.conversation_id) 

107 .order_by(Message.id.desc()) 

108 .limit(1) 

109 ).scalar_one() 

110 

111 lat, lng = get_coordinates(host_request.hosting_location) 

112 

113 need_feedback = False 

114 if context.user_id == host_request.host_user_id and host_request.status == HostRequestStatus.rejected: 

115 need_feedback = not session.execute( 

116 select( 

117 exists().where( 

118 HostRequestFeedback.from_user_id == context.user_id, 

119 HostRequestFeedback.host_request_id == host_request.conversation_id, 

120 ) 

121 ) 

122 ).scalar_one() 

123 

124 return requests_pb2.HostRequest( 

125 host_request_id=host_request.conversation_id, 

126 surfer_user_id=host_request.surfer_user_id, 

127 host_user_id=host_request.host_user_id, 

128 status=hostrequeststatus2api[host_request.status], 

129 created=Timestamp_from_datetime(initial_message.time), 

130 from_date=date_to_api(host_request.from_date), 

131 to_date=date_to_api(host_request.to_date), 

132 last_seen_message_id=( 

133 host_request.surfer_last_seen_message_id 

134 if context.user_id == host_request.surfer_user_id 

135 else host_request.host_last_seen_message_id 

136 ), 

137 latest_message=message_to_pb(latest_message), 

138 hosting_city=host_request.hosting_city, 

139 hosting_lat=lat, 

140 hosting_lng=lng, 

141 hosting_radius=host_request.hosting_radius, 

142 need_host_request_feedback=need_feedback, 

143 ) 

144 

145 

146def _possibly_observe_first_response_time(session, host_request, user_id, response_type): 

147 # if this is the first response then there's nothing by this user yet 

148 assert host_request.host_user_id == user_id 

149 

150 number_messages_by_host = session.execute( 

151 select(func.count()) 

152 .where(Message.conversation_id == host_request.conversation_id) 

153 .where(Message.author_id == user_id) 

154 ).scalar_one_or_none() 

155 

156 if number_messages_by_host == 0: 

157 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one() 

158 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one() 

159 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe( 

160 (now() - host_request.conversation.created).total_seconds() 

161 ) 

162 

163 

164class Requests(requests_pb2_grpc.RequestsServicer): 

165 def CreateHostRequest(self, request, context, session): 

166 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

167 if not user.has_completed_profile: 

168 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_REQUEST) 

169 

170 if request.host_user_id == context.user_id: 

171 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_REQUEST_SELF) 

172 

173 # just to check host exists and is visible 

174 host = session.execute( 

175 select(User).where_users_visible(context).where(User.id == request.host_user_id) 

176 ).scalar_one_or_none() 

177 if not host: 

178 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

179 

180 from_date = parse_date(request.from_date) 

181 to_date = parse_date(request.to_date) 

182 

183 if not from_date or not to_date: 

184 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_DATE) 

185 

186 today = today_in_timezone(host.timezone) 

187 

188 # request starts from the past 

189 if from_date < today: 

190 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_BEFORE_TODAY) 

191 

192 # from_date is not >= to_date 

193 if from_date >= to_date: 

194 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_AFTER_TO) 

195 

196 # No need to check today > to_date 

197 

198 if from_date - today > timedelta(days=365): 

199 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_AFTER_ONE_YEAR) 

200 

201 if to_date - from_date > timedelta(days=365): 

202 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_TO_AFTER_ONE_YEAR) 

203 

204 # Check if user has been sending host requests excessively 

205 if process_rate_limits_and_check_abort( 

206 session=session, user_id=context.user_id, action=RateLimitAction.host_request 

207 ): 

208 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.HOST_REQUEST_RATE_LIMIT) 

209 

210 conversation = Conversation() 

211 session.add(conversation) 

212 session.flush() 

213 

214 session.add( 

215 Message( 

216 conversation_id=conversation.id, 

217 author_id=context.user_id, 

218 message_type=MessageType.chat_created, 

219 ) 

220 ) 

221 

222 message = Message( 

223 conversation_id=conversation.id, 

224 author_id=context.user_id, 

225 text=request.text, 

226 message_type=MessageType.text, 

227 ) 

228 session.add(message) 

229 session.flush() 

230 

231 host_request = HostRequest( 

232 conversation_id=conversation.id, 

233 surfer_user_id=context.user_id, 

234 host_user_id=host.id, 

235 from_date=from_date, 

236 to_date=to_date, 

237 status=HostRequestStatus.pending, 

238 surfer_last_seen_message_id=message.id, 

239 # TODO: tz 

240 # timezone=host.timezone, 

241 hosting_city=host.city, 

242 hosting_location=host.geom, 

243 hosting_radius=host.geom_radius, 

244 ) 

245 session.add(host_request) 

246 session.commit() 

247 

248 notify( 

249 session, 

250 user_id=host_request.host_user_id, 

251 topic_action="host_request:create", 

252 key=host_request.conversation_id, 

253 data=notification_data_pb2.HostRequestCreate( 

254 host_request=host_request_to_pb(host_request, session, context), 

255 surfer=user_model_to_pb(host_request.surfer, session, context), 

256 text=request.text, 

257 ), 

258 ) 

259 

260 host_requests_sent_counter.labels(user.gender, host.gender).inc() 

261 sent_messages_counter.labels(user.gender, "host request send").inc() 

262 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe( 

263 (now() - user.joined).total_seconds() 

264 ) 

265 

266 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id) 

267 

268 def GetHostRequest(self, request, context, session): 

269 host_request = session.execute( 

270 select(HostRequest) 

271 .where_users_column_visible(context, HostRequest.surfer_user_id) 

272 .where_users_column_visible(context, HostRequest.host_user_id) 

273 .where(HostRequest.conversation_id == request.host_request_id) 

274 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

275 ).scalar_one_or_none() 

276 

277 if not host_request: 

278 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

279 

280 return host_request_to_pb(host_request, session, context) 

281 

282 def ListHostRequests(self, request, context, session): 

283 if request.only_sent and request.only_received: 

284 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_SENT_OR_RECEIVED) 

285 

286 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

287 pagination = min(pagination, MAX_PAGE_SIZE) 

288 

289 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have 

290 # none as message_2.id. So just filter for these ones to get highest messages only. 

291 # See https://stackoverflow.com/a/27802817/6115336 

292 message_2 = aliased(Message) 

293 statement = ( 

294 select(Message, HostRequest, Conversation) 

295 .outerjoin(message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id)) 

296 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

297 .join(Conversation, Conversation.id == HostRequest.conversation_id) 

298 .where_users_column_visible(context, HostRequest.surfer_user_id) 

299 .where_users_column_visible(context, HostRequest.host_user_id) 

300 .where(message_2.id == None) 

301 .where(or_(Message.id < request.last_request_id, request.last_request_id == 0)) 

302 ) 

303 

304 if request.only_sent: 

305 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

306 elif request.only_received: 

307 statement = statement.where(HostRequest.host_user_id == context.user_id) 

308 elif request.HasField("only_archived"): 

309 statement = statement.where( 

310 or_( 

311 and_( 

312 HostRequest.surfer_user_id == context.user_id, 

313 HostRequest.is_surfer_archived == request.only_archived, 

314 ), 

315 and_( 

316 HostRequest.host_user_id == context.user_id, 

317 HostRequest.is_host_archived == request.only_archived, 

318 ), 

319 ) 

320 ) 

321 else: 

322 statement = statement.where( 

323 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

324 ) 

325 

326 # TODO: I considered having the latest control message be the single source of truth for 

327 # the HostRequest.status, but decided against it because of this filter. 

328 # Another possibility is to filter in the python instead of SQL, but that's slower 

329 if request.only_active: 

330 statement = statement.where( 

331 or_( 

332 HostRequest.status == HostRequestStatus.pending, 

333 HostRequest.status == HostRequestStatus.accepted, 

334 HostRequest.status == HostRequestStatus.confirmed, 

335 ) 

336 ) 

337 statement = statement.where(HostRequest.end_time <= func.now()) 

338 

339 statement = statement.order_by(Message.id.desc()).limit(pagination + 1) 

340 results = session.execute(statement).all() 

341 

342 host_requests = [] 

343 for result in results[:pagination]: 

344 lat, lng = get_coordinates(result.HostRequest.hosting_location) 

345 host_requests.append( 

346 requests_pb2.HostRequest( 

347 host_request_id=result.HostRequest.conversation_id, 

348 surfer_user_id=result.HostRequest.surfer_user_id, 

349 host_user_id=result.HostRequest.host_user_id, 

350 status=hostrequeststatus2api[result.HostRequest.status], 

351 created=Timestamp_from_datetime(result.Conversation.created), 

352 from_date=date_to_api(result.HostRequest.from_date), 

353 to_date=date_to_api(result.HostRequest.to_date), 

354 last_seen_message_id=( 

355 result.HostRequest.surfer_last_seen_message_id 

356 if context.user_id == result.HostRequest.surfer_user_id 

357 else result.HostRequest.host_last_seen_message_id 

358 ), 

359 latest_message=message_to_pb(result.Message), 

360 hosting_city=result.HostRequest.hosting_city, 

361 hosting_lat=lat, 

362 hosting_lng=lng, 

363 hosting_radius=result.HostRequest.hosting_radius, 

364 ) 

365 ) 

366 

367 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO 

368 no_more = len(results) <= pagination 

369 

370 return requests_pb2.ListHostRequestsRes( 

371 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests 

372 ) 

373 

374 def RespondHostRequest(self, request, context, session): 

375 def count_host_response(other_user_id, response_type): 

376 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

377 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one() 

378 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc() 

379 sent_messages_counter.labels(user_gender, "host request response").inc() 

380 

381 host_request = session.execute( 

382 select(HostRequest) 

383 .where_users_column_visible(context, HostRequest.surfer_user_id) 

384 .where_users_column_visible(context, HostRequest.host_user_id) 

385 .where(HostRequest.conversation_id == request.host_request_id) 

386 ).scalar_one_or_none() 

387 

388 if not host_request: 

389 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

390 

391 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

392 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

393 

394 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING: 

395 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

396 

397 if host_request.end_time < now(): 

398 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_IN_PAST) 

399 

400 control_message = Message() 

401 

402 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED: 

403 # only host can accept 

404 if context.user_id != host_request.host_user_id: 

405 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.NOT_THE_HOST) 

406 # can't accept a cancelled or confirmed request (only reject), or already accepted 

407 if ( 

408 host_request.status == HostRequestStatus.cancelled 

409 or host_request.status == HostRequestStatus.confirmed 

410 or host_request.status == HostRequestStatus.accepted 

411 ): 

412 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

413 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted") 

414 control_message.host_request_status_target = HostRequestStatus.accepted 

415 host_request.status = HostRequestStatus.accepted 

416 session.flush() 

417 

418 notify( 

419 session, 

420 user_id=host_request.surfer_user_id, 

421 topic_action="host_request:accept", 

422 key=host_request.conversation_id, 

423 data=notification_data_pb2.HostRequestAccept( 

424 host_request=host_request_to_pb(host_request, session, context), 

425 host=user_model_to_pb(host_request.host, session, context), 

426 ), 

427 ) 

428 

429 count_host_response(host_request.surfer_user_id, "accepted") 

430 

431 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED: 

432 # only host can reject 

433 if context.user_id != host_request.host_user_id: 

434 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

435 # can't reject a cancelled or already rejected request 

436 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 

437 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

438 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected") 

439 control_message.host_request_status_target = HostRequestStatus.rejected 

440 host_request.status = HostRequestStatus.rejected 

441 session.flush() 

442 

443 notify( 

444 session, 

445 user_id=host_request.surfer_user_id, 

446 topic_action="host_request:reject", 

447 key=host_request.conversation_id, 

448 data=notification_data_pb2.HostRequestReject( 

449 host_request=host_request_to_pb(host_request, session, context), 

450 host=user_model_to_pb(host_request.host, session, context), 

451 ), 

452 ) 

453 

454 count_host_response(host_request.surfer_user_id, "rejected") 

455 

456 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED: 

457 # only surfer can confirm 

458 if context.user_id != host_request.surfer_user_id: 

459 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

460 # can only confirm an accepted request 

461 if host_request.status != HostRequestStatus.accepted: 

462 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

463 control_message.host_request_status_target = HostRequestStatus.confirmed 

464 host_request.status = HostRequestStatus.confirmed 

465 session.flush() 

466 

467 notify( 

468 session, 

469 user_id=host_request.host_user_id, 

470 topic_action="host_request:confirm", 

471 key=host_request.conversation_id, 

472 data=notification_data_pb2.HostRequestConfirm( 

473 host_request=host_request_to_pb(host_request, session, context), 

474 surfer=user_model_to_pb(host_request.surfer, session, context), 

475 ), 

476 ) 

477 

478 count_host_response(host_request.host_user_id, "confirmed") 

479 

480 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED: 

481 # only surfer can cancel 

482 if context.user_id != host_request.surfer_user_id: 

483 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

484 # can't' cancel an already cancelled or rejected request 

485 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 

486 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

487 control_message.host_request_status_target = HostRequestStatus.cancelled 

488 host_request.status = HostRequestStatus.cancelled 

489 session.flush() 

490 

491 notify( 

492 session, 

493 user_id=host_request.host_user_id, 

494 topic_action="host_request:cancel", 

495 key=host_request.conversation_id, 

496 data=notification_data_pb2.HostRequestCancel( 

497 host_request=host_request_to_pb(host_request, session, context), 

498 surfer=user_model_to_pb(host_request.surfer, session, context), 

499 ), 

500 ) 

501 

502 count_host_response(host_request.host_user_id, "cancelled") 

503 

504 control_message.message_type = MessageType.host_request_status_changed 

505 control_message.conversation_id = host_request.conversation_id 

506 control_message.author_id = context.user_id 

507 session.add(control_message) 

508 

509 if request.text: 

510 latest_message = Message() 

511 latest_message.conversation_id = host_request.conversation_id 

512 latest_message.text = request.text 

513 latest_message.author_id = context.user_id 

514 latest_message.message_type = MessageType.text 

515 session.add(latest_message) 

516 else: 

517 latest_message = control_message 

518 

519 session.flush() 

520 

521 if host_request.surfer_user_id == context.user_id: 

522 host_request.surfer_last_seen_message_id = latest_message.id 

523 else: 

524 host_request.host_last_seen_message_id = latest_message.id 

525 session.commit() 

526 

527 return empty_pb2.Empty() 

528 

529 def GetHostRequestMessages(self, request, context, session): 

530 host_request = session.execute( 

531 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

532 ).scalar_one_or_none() 

533 

534 if not host_request: 

535 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

536 

537 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

538 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

539 

540 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

541 pagination = min(pagination, MAX_PAGE_SIZE) 

542 

543 messages = ( 

544 session.execute( 

545 select(Message) 

546 .where(Message.conversation_id == host_request.conversation_id) 

547 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

548 .order_by(Message.id.desc()) 

549 .limit(pagination + 1) 

550 ) 

551 .scalars() 

552 .all() 

553 ) 

554 

555 no_more = len(messages) <= pagination 

556 

557 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0 

558 

559 return requests_pb2.GetHostRequestMessagesRes( 

560 last_message_id=last_message_id, 

561 no_more=no_more, 

562 messages=[message_to_pb(message) for message in messages[:pagination]], 

563 ) 

564 

565 def SendHostRequestMessage(self, request, context, session): 

566 if request.text == "": 

567 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

568 host_request = session.execute( 

569 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

570 ).scalar_one_or_none() 

571 

572 if not host_request: 

573 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

574 

575 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

576 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

577 

578 if host_request.host_user_id == context.user_id: 

579 _possibly_observe_first_response_time(session, host_request, context.user_id, "message") 

580 

581 message = Message() 

582 message.conversation_id = host_request.conversation_id 

583 message.author_id = context.user_id 

584 message.message_type = MessageType.text 

585 message.text = request.text 

586 session.add(message) 

587 session.flush() 

588 

589 if host_request.surfer_user_id == context.user_id: 

590 host_request.surfer_last_seen_message_id = message.id 

591 

592 notify( 

593 session, 

594 user_id=host_request.host_user_id, 

595 topic_action="host_request:message", 

596 key=host_request.conversation_id, 

597 data=notification_data_pb2.HostRequestMessage( 

598 host_request=host_request_to_pb(host_request, session, context), 

599 user=user_model_to_pb(host_request.surfer, session, context), 

600 text=request.text, 

601 am_host=True, 

602 ), 

603 ) 

604 

605 else: 

606 host_request.host_last_seen_message_id = message.id 

607 

608 notify( 

609 session, 

610 user_id=host_request.surfer_user_id, 

611 topic_action="host_request:message", 

612 key=host_request.conversation_id, 

613 data=notification_data_pb2.HostRequestMessage( 

614 host_request=host_request_to_pb(host_request, session, context), 

615 user=user_model_to_pb(host_request.host, session, context), 

616 text=request.text, 

617 am_host=False, 

618 ), 

619 ) 

620 

621 session.commit() 

622 

623 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

624 sent_messages_counter.labels(user_gender, "host request").inc() 

625 

626 return empty_pb2.Empty() 

627 

628 def GetHostRequestUpdates(self, request, context, session): 

629 if request.only_sent and request.only_received: 

630 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_SENT_OR_RECEIVED) 

631 

632 if request.newest_message_id == 0: 

633 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

634 

635 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 

636 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

637 

638 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

639 pagination = min(pagination, MAX_PAGE_SIZE) 

640 

641 statement = ( 

642 select( 

643 Message, 

644 HostRequest.status.label("host_request_status"), 

645 HostRequest.conversation_id.label("host_request_id"), 

646 ) 

647 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

648 .where(Message.id > request.newest_message_id) 

649 ) 

650 

651 if request.only_sent: 

652 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

653 elif request.only_received: 

654 statement = statement.where(HostRequest.host_user_id == context.user_id) 

655 else: 

656 statement = statement.where( 

657 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

658 ) 

659 

660 statement = statement.order_by(Message.id.asc()).limit(pagination + 1) 

661 res = session.execute(statement).all() 

662 

663 no_more = len(res) <= pagination 

664 

665 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO 

666 

667 return requests_pb2.GetHostRequestUpdatesRes( 

668 no_more=no_more, 

669 updates=[ 

670 requests_pb2.HostRequestUpdate( 

671 host_request_id=result.host_request_id, 

672 status=hostrequeststatus2api[result.host_request_status], 

673 message=message_to_pb(result.Message), 

674 ) 

675 for result in res[:pagination] 

676 ], 

677 ) 

678 

679 def MarkLastSeenHostRequest(self, request, context, session): 

680 host_request = session.execute( 

681 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

682 ).scalar_one_or_none() 

683 

684 if not host_request: 

685 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

686 

687 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

688 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

689 

690 if host_request.surfer_user_id == context.user_id: 

691 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id: 

692 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES) 

693 host_request.surfer_last_seen_message_id = request.last_seen_message_id 

694 else: 

695 if not host_request.host_last_seen_message_id <= request.last_seen_message_id: 

696 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES) 

697 host_request.host_last_seen_message_id = request.last_seen_message_id 

698 

699 session.commit() 

700 return empty_pb2.Empty() 

701 

702 def SetHostRequestArchiveStatus(self, request, context, session): 

703 host_request: HostRequest = session.execute( 

704 select(HostRequest) 

705 .where(HostRequest.conversation_id == request.host_request_id) 

706 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

707 ).scalar_one_or_none() 

708 

709 if not host_request: 

710 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

711 

712 if context.user_id == host_request.surfer_user_id: 

713 host_request.is_surfer_archived = request.is_archived 

714 else: 

715 host_request.is_host_archived = request.is_archived 

716 

717 return requests_pb2.SetHostRequestArchiveStatusRes( 

718 host_request_id=host_request.conversation_id, 

719 is_archived=request.is_archived, 

720 ) 

721 

722 def GetResponseRate(self, request, context, session): 

723 user_res = session.execute( 

724 select(User.id, UserResponseRate) 

725 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id) 

726 .where_users_visible(context) 

727 .where(User.id == request.user_id) 

728 ).one_or_none() 

729 

730 # if user doesn't exist, return None 

731 if not user_res: 

732 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

733 

734 user, response_rates = user_res 

735 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates)) 

736 

737 def SendHostRequestFeedback(self, request, context, session): 

738 host_request = session.execute( 

739 select(HostRequest) 

740 .where(HostRequest.conversation_id == request.host_request_id) 

741 .where(HostRequest.host_user_id == context.user_id) 

742 ).scalar_one_or_none() 

743 

744 if not host_request: 

745 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

746 

747 feedback = session.execute( 

748 select(HostRequestFeedback) 

749 .where(HostRequestFeedback.host_request_id == host_request.conversation_id) 

750 .where(HostRequestFeedback.from_user_id == context.user_id) 

751 ).scalar_one_or_none() 

752 

753 if feedback: 

754 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_LEFT_HOST_REQUEST_FEEDBACK) 

755 

756 session.add( 

757 HostRequestFeedback( 

758 host_request_id=host_request.conversation_id, 

759 from_user_id=host_request.host_user_id, 

760 to_user_id=host_request.surfer_user_id, 

761 request_quality=hostrequestquality2sql.get(request.host_request_quality), 

762 decline_reason=request.decline_reason, 

763 ) 

764 ) 

765 

766 return empty_pb2.Empty()