Coverage for src/couchers/servicers/requests.py: 93%

284 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-21 02:54 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import exists 

7from sqlalchemy.orm import aliased 

8from sqlalchemy.sql import and_, func, or_ 

9 

10from couchers.materialized_views import UserResponseRate 

11from couchers.metrics import ( 

12 account_age_on_host_request_create_histogram, 

13 host_request_first_response_histogram, 

14 host_request_responses_counter, 

15 host_requests_sent_counter, 

16 sent_messages_counter, 

17) 

18from couchers.models import ( 

19 Conversation, 

20 HostRequest, 

21 HostRequestFeedback, 

22 HostRequestQuality, 

23 HostRequestStatus, 

24 Message, 

25 MessageType, 

26 RateLimitAction, 

27 User, 

28) 

29from couchers.notifications.notify import notify 

30from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc 

31from couchers.rate_limits.check import process_rate_limits_and_check_abort 

32from couchers.rate_limits.definitions import RATE_LIMIT_HOURS 

33from couchers.servicers.api import response_rate_to_pb, user_model_to_pb 

34from couchers.sql import couchers_select as select 

35from couchers.utils import ( 

36 Timestamp_from_datetime, 

37 date_to_api, 

38 get_coordinates, 

39 now, 

40 parse_date, 

41 today_in_timezone, 

42) 

43 

44logger = logging.getLogger(__name__) 

45 

46DEFAULT_PAGINATION_LENGTH = 10 

47MAX_PAGE_SIZE = 50 

48 

49 

50hostrequeststatus2api = { 

51 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING, 

52 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

53 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED, 

54 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED, 

55 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED, 

56} 

57 

58hostrequestquality2sql = { 

59 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality, 

60 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality, 

61 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality, 

62} 

63 

64 

65def message_to_pb(message: Message): 

66 """ 

67 Turns the given message to a protocol buffer 

68 """ 

69 if message.is_normal_message: 

70 return conversations_pb2.Message( 

71 message_id=message.id, 

72 author_user_id=message.author_id, 

73 time=Timestamp_from_datetime(message.time), 

74 text=conversations_pb2.MessageContentText(text=message.text), 

75 ) 

76 else: 

77 return conversations_pb2.Message( 

78 message_id=message.id, 

79 author_user_id=message.author_id, 

80 time=Timestamp_from_datetime(message.time), 

81 chat_created=( 

82 conversations_pb2.MessageContentChatCreated() 

83 if message.message_type == MessageType.chat_created 

84 else None 

85 ), 

86 host_request_status_changed=( 

87 conversations_pb2.MessageContentHostRequestStatusChanged( 

88 status=hostrequeststatus2api[message.host_request_status_target] 

89 ) 

90 if message.message_type == MessageType.host_request_status_changed 

91 else None 

92 ), 

93 ) 

94 

95 

96def host_request_to_pb(host_request: HostRequest, session, context): 

97 initial_message = session.execute( 

98 select(Message) 

99 .where(Message.conversation_id == host_request.conversation_id) 

100 .order_by(Message.id.asc()) 

101 .limit(1) 

102 ).scalar_one() 

103 

104 latest_message = session.execute( 

105 select(Message) 

106 .where(Message.conversation_id == host_request.conversation_id) 

107 .order_by(Message.id.desc()) 

108 .limit(1) 

109 ).scalar_one() 

110 

111 lat, lng = get_coordinates(host_request.hosting_location) 

112 

113 need_feedback = False 

114 if context.user_id == host_request.host_user_id and host_request.status == HostRequestStatus.rejected: 

115 need_feedback = not session.execute( 

116 select( 

117 exists().where( 

118 HostRequestFeedback.from_user_id == context.user_id, 

119 HostRequestFeedback.host_request_id == host_request.conversation_id, 

120 ) 

121 ) 

122 ).scalar_one() 

123 

124 return requests_pb2.HostRequest( 

125 host_request_id=host_request.conversation_id, 

126 surfer_user_id=host_request.surfer_user_id, 

127 host_user_id=host_request.host_user_id, 

128 status=hostrequeststatus2api[host_request.status], 

129 created=Timestamp_from_datetime(initial_message.time), 

130 from_date=date_to_api(host_request.from_date), 

131 to_date=date_to_api(host_request.to_date), 

132 last_seen_message_id=( 

133 host_request.surfer_last_seen_message_id 

134 if context.user_id == host_request.surfer_user_id 

135 else host_request.host_last_seen_message_id 

136 ), 

137 latest_message=message_to_pb(latest_message), 

138 hosting_city=host_request.hosting_city, 

139 hosting_lat=lat, 

140 hosting_lng=lng, 

141 hosting_radius=host_request.hosting_radius, 

142 need_host_request_feedback=need_feedback, 

143 ) 

144 

145 

146def _possibly_observe_first_response_time(session, host_request, user_id, response_type): 

147 # if this is the first response then there's nothing by this user yet 

148 assert host_request.host_user_id == user_id 

149 

150 number_messages_by_host = session.execute( 

151 select(func.count()) 

152 .where(Message.conversation_id == host_request.conversation_id) 

153 .where(Message.author_id == user_id) 

154 ).scalar_one_or_none() 

155 

156 if number_messages_by_host == 0: 

157 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one() 

158 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one() 

159 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe( 

160 (now() - host_request.conversation.created).total_seconds() 

161 ) 

162 

163 

164class Requests(requests_pb2_grpc.RequestsServicer): 

165 def CreateHostRequest(self, request, context, session): 

166 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

167 if not user.has_completed_profile: 

168 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request") 

169 

170 if request.host_user_id == context.user_id: 

171 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self") 

172 

173 # just to check host exists and is visible 

174 host = session.execute( 

175 select(User).where_users_visible(context).where(User.id == request.host_user_id) 

176 ).scalar_one_or_none() 

177 if not host: 

178 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

179 

180 from_date = parse_date(request.from_date) 

181 to_date = parse_date(request.to_date) 

182 

183 if not from_date or not to_date: 

184 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date") 

185 

186 today = today_in_timezone(host.timezone) 

187 

188 # request starts from the past 

189 if from_date < today: 

190 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today") 

191 

192 # from_date is not >= to_date 

193 if from_date >= to_date: 

194 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to") 

195 

196 # No need to check today > to_date 

197 

198 if from_date - today > timedelta(days=365): 

199 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year") 

200 

201 if to_date - from_date > timedelta(days=365): 

202 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year") 

203 

204 # Check if user has been sending host requests excessively 

205 if process_rate_limits_and_check_abort( 

206 session=session, user_id=context.user_id, action=RateLimitAction.host_request 

207 ): 

208 context.abort_with_error_code( 

209 grpc.StatusCode.RESOURCE_EXHAUSTED, 

210 "host_request_rate_limit", 

211 substitutions={"hours": RATE_LIMIT_HOURS}, 

212 ) 

213 

214 conversation = Conversation() 

215 session.add(conversation) 

216 session.flush() 

217 

218 session.add( 

219 Message( 

220 conversation_id=conversation.id, 

221 author_id=context.user_id, 

222 message_type=MessageType.chat_created, 

223 ) 

224 ) 

225 

226 message = Message( 

227 conversation_id=conversation.id, 

228 author_id=context.user_id, 

229 text=request.text, 

230 message_type=MessageType.text, 

231 ) 

232 session.add(message) 

233 session.flush() 

234 

235 host_request = HostRequest( 

236 conversation_id=conversation.id, 

237 surfer_user_id=context.user_id, 

238 host_user_id=host.id, 

239 from_date=from_date, 

240 to_date=to_date, 

241 status=HostRequestStatus.pending, 

242 surfer_last_seen_message_id=message.id, 

243 # TODO: tz 

244 # timezone=host.timezone, 

245 hosting_city=host.city, 

246 hosting_location=host.geom, 

247 hosting_radius=host.geom_radius, 

248 ) 

249 session.add(host_request) 

250 session.commit() 

251 

252 notify( 

253 session, 

254 user_id=host_request.host_user_id, 

255 topic_action="host_request:create", 

256 key=host_request.conversation_id, 

257 data=notification_data_pb2.HostRequestCreate( 

258 host_request=host_request_to_pb(host_request, session, context), 

259 surfer=user_model_to_pb(host_request.surfer, session, context), 

260 text=request.text, 

261 ), 

262 ) 

263 

264 host_requests_sent_counter.labels(user.gender, host.gender).inc() 

265 sent_messages_counter.labels(user.gender, "host request send").inc() 

266 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe( 

267 (now() - user.joined).total_seconds() 

268 ) 

269 

270 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id) 

271 

272 def GetHostRequest(self, request, context, session): 

273 host_request = session.execute( 

274 select(HostRequest) 

275 .where_users_column_visible(context, HostRequest.surfer_user_id) 

276 .where_users_column_visible(context, HostRequest.host_user_id) 

277 .where(HostRequest.conversation_id == request.host_request_id) 

278 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

279 ).scalar_one_or_none() 

280 

281 if not host_request: 

282 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

283 

284 return host_request_to_pb(host_request, session, context) 

285 

286 def ListHostRequests(self, request, context, session): 

287 if request.only_sent and request.only_received: 

288 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

289 

290 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

291 pagination = min(pagination, MAX_PAGE_SIZE) 

292 

293 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have 

294 # none as message_2.id. So just filter for these ones to get highest messages only. 

295 # See https://stackoverflow.com/a/27802817/6115336 

296 message_2 = aliased(Message) 

297 statement = ( 

298 select(Message, HostRequest, Conversation) 

299 .outerjoin(message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id)) 

300 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

301 .join(Conversation, Conversation.id == HostRequest.conversation_id) 

302 .where_users_column_visible(context, HostRequest.surfer_user_id) 

303 .where_users_column_visible(context, HostRequest.host_user_id) 

304 .where(message_2.id == None) 

305 .where(or_(Message.id < request.last_request_id, request.last_request_id == 0)) 

306 ) 

307 

308 if request.only_sent: 

309 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

310 elif request.only_received: 

311 statement = statement.where(HostRequest.host_user_id == context.user_id) 

312 elif request.HasField("only_archived"): 

313 statement = statement.where( 

314 or_( 

315 and_( 

316 HostRequest.surfer_user_id == context.user_id, 

317 HostRequest.is_surfer_archived == request.only_archived, 

318 ), 

319 and_( 

320 HostRequest.host_user_id == context.user_id, 

321 HostRequest.is_host_archived == request.only_archived, 

322 ), 

323 ) 

324 ) 

325 else: 

326 statement = statement.where( 

327 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

328 ) 

329 

330 # TODO: I considered having the latest control message be the single source of truth for 

331 # the HostRequest.status, but decided against it because of this filter. 

332 # Another possibility is to filter in the python instead of SQL, but that's slower 

333 if request.only_active: 

334 statement = statement.where( 

335 or_( 

336 HostRequest.status == HostRequestStatus.pending, 

337 HostRequest.status == HostRequestStatus.accepted, 

338 HostRequest.status == HostRequestStatus.confirmed, 

339 ) 

340 ) 

341 statement = statement.where(HostRequest.end_time <= func.now()) 

342 

343 statement = statement.order_by(Message.id.desc()).limit(pagination + 1) 

344 results = session.execute(statement).all() 

345 

346 host_requests = [] 

347 for result in results[:pagination]: 

348 lat, lng = get_coordinates(result.HostRequest.hosting_location) 

349 host_requests.append( 

350 requests_pb2.HostRequest( 

351 host_request_id=result.HostRequest.conversation_id, 

352 surfer_user_id=result.HostRequest.surfer_user_id, 

353 host_user_id=result.HostRequest.host_user_id, 

354 status=hostrequeststatus2api[result.HostRequest.status], 

355 created=Timestamp_from_datetime(result.Conversation.created), 

356 from_date=date_to_api(result.HostRequest.from_date), 

357 to_date=date_to_api(result.HostRequest.to_date), 

358 last_seen_message_id=( 

359 result.HostRequest.surfer_last_seen_message_id 

360 if context.user_id == result.HostRequest.surfer_user_id 

361 else result.HostRequest.host_last_seen_message_id 

362 ), 

363 latest_message=message_to_pb(result.Message), 

364 hosting_city=result.HostRequest.hosting_city, 

365 hosting_lat=lat, 

366 hosting_lng=lng, 

367 hosting_radius=result.HostRequest.hosting_radius, 

368 ) 

369 ) 

370 

371 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO 

372 no_more = len(results) <= pagination 

373 

374 return requests_pb2.ListHostRequestsRes( 

375 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests 

376 ) 

377 

378 def RespondHostRequest(self, request, context, session): 

379 def count_host_response(other_user_id, response_type): 

380 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

381 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one() 

382 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc() 

383 sent_messages_counter.labels(user_gender, "host request response").inc() 

384 

385 host_request = session.execute( 

386 select(HostRequest) 

387 .where_users_column_visible(context, HostRequest.surfer_user_id) 

388 .where_users_column_visible(context, HostRequest.host_user_id) 

389 .where(HostRequest.conversation_id == request.host_request_id) 

390 ).scalar_one_or_none() 

391 

392 if not host_request: 

393 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

394 

395 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

396 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

397 

398 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING: 

399 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

400 

401 if host_request.end_time < now(): 

402 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past") 

403 

404 control_message = Message() 

405 

406 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED: 

407 # only host can accept 

408 if context.user_id != host_request.host_user_id: 

409 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host") 

410 # can't accept a cancelled or confirmed request (only reject), or already accepted 

411 if ( 

412 host_request.status == HostRequestStatus.cancelled 

413 or host_request.status == HostRequestStatus.confirmed 

414 or host_request.status == HostRequestStatus.accepted 

415 ): 

416 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

417 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted") 

418 control_message.host_request_status_target = HostRequestStatus.accepted 

419 host_request.status = HostRequestStatus.accepted 

420 session.flush() 

421 

422 notify( 

423 session, 

424 user_id=host_request.surfer_user_id, 

425 topic_action="host_request:accept", 

426 key=host_request.conversation_id, 

427 data=notification_data_pb2.HostRequestAccept( 

428 host_request=host_request_to_pb(host_request, session, context), 

429 host=user_model_to_pb(host_request.host, session, context), 

430 ), 

431 ) 

432 

433 count_host_response(host_request.surfer_user_id, "accepted") 

434 

435 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED: 

436 # only host can reject 

437 if context.user_id != host_request.host_user_id: 

438 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

439 # can't reject a cancelled or already rejected request 

440 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 

441 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

442 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected") 

443 control_message.host_request_status_target = HostRequestStatus.rejected 

444 host_request.status = HostRequestStatus.rejected 

445 session.flush() 

446 

447 notify( 

448 session, 

449 user_id=host_request.surfer_user_id, 

450 topic_action="host_request:reject", 

451 key=host_request.conversation_id, 

452 data=notification_data_pb2.HostRequestReject( 

453 host_request=host_request_to_pb(host_request, session, context), 

454 host=user_model_to_pb(host_request.host, session, context), 

455 ), 

456 ) 

457 

458 count_host_response(host_request.surfer_user_id, "rejected") 

459 

460 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED: 

461 # only surfer can confirm 

462 if context.user_id != host_request.surfer_user_id: 

463 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

464 # can only confirm an accepted request 

465 if host_request.status != HostRequestStatus.accepted: 

466 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

467 control_message.host_request_status_target = HostRequestStatus.confirmed 

468 host_request.status = HostRequestStatus.confirmed 

469 session.flush() 

470 

471 notify( 

472 session, 

473 user_id=host_request.host_user_id, 

474 topic_action="host_request:confirm", 

475 key=host_request.conversation_id, 

476 data=notification_data_pb2.HostRequestConfirm( 

477 host_request=host_request_to_pb(host_request, session, context), 

478 surfer=user_model_to_pb(host_request.surfer, session, context), 

479 ), 

480 ) 

481 

482 count_host_response(host_request.host_user_id, "confirmed") 

483 

484 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED: 

485 # only surfer can cancel 

486 if context.user_id != host_request.surfer_user_id: 

487 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

488 # can't' cancel an already cancelled or rejected request 

489 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 

490 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status") 

491 control_message.host_request_status_target = HostRequestStatus.cancelled 

492 host_request.status = HostRequestStatus.cancelled 

493 session.flush() 

494 

495 notify( 

496 session, 

497 user_id=host_request.host_user_id, 

498 topic_action="host_request:cancel", 

499 key=host_request.conversation_id, 

500 data=notification_data_pb2.HostRequestCancel( 

501 host_request=host_request_to_pb(host_request, session, context), 

502 surfer=user_model_to_pb(host_request.surfer, session, context), 

503 ), 

504 ) 

505 

506 count_host_response(host_request.host_user_id, "cancelled") 

507 

508 control_message.message_type = MessageType.host_request_status_changed 

509 control_message.conversation_id = host_request.conversation_id 

510 control_message.author_id = context.user_id 

511 session.add(control_message) 

512 

513 if request.text: 

514 latest_message = Message() 

515 latest_message.conversation_id = host_request.conversation_id 

516 latest_message.text = request.text 

517 latest_message.author_id = context.user_id 

518 latest_message.message_type = MessageType.text 

519 session.add(latest_message) 

520 else: 

521 latest_message = control_message 

522 

523 session.flush() 

524 

525 if host_request.surfer_user_id == context.user_id: 

526 host_request.surfer_last_seen_message_id = latest_message.id 

527 else: 

528 host_request.host_last_seen_message_id = latest_message.id 

529 session.commit() 

530 

531 return empty_pb2.Empty() 

532 

533 def GetHostRequestMessages(self, request, context, session): 

534 host_request = session.execute( 

535 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

536 ).scalar_one_or_none() 

537 

538 if not host_request: 

539 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

540 

541 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

542 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

543 

544 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

545 pagination = min(pagination, MAX_PAGE_SIZE) 

546 

547 messages = ( 

548 session.execute( 

549 select(Message) 

550 .where(Message.conversation_id == host_request.conversation_id) 

551 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

552 .order_by(Message.id.desc()) 

553 .limit(pagination + 1) 

554 ) 

555 .scalars() 

556 .all() 

557 ) 

558 

559 no_more = len(messages) <= pagination 

560 

561 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0 

562 

563 return requests_pb2.GetHostRequestMessagesRes( 

564 last_message_id=last_message_id, 

565 no_more=no_more, 

566 messages=[message_to_pb(message) for message in messages[:pagination]], 

567 ) 

568 

569 def SendHostRequestMessage(self, request, context, session): 

570 if request.text == "": 

571 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

572 host_request = session.execute( 

573 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

574 ).scalar_one_or_none() 

575 

576 if not host_request: 

577 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

578 

579 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

580 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

581 

582 if host_request.host_user_id == context.user_id: 

583 _possibly_observe_first_response_time(session, host_request, context.user_id, "message") 

584 

585 message = Message() 

586 message.conversation_id = host_request.conversation_id 

587 message.author_id = context.user_id 

588 message.message_type = MessageType.text 

589 message.text = request.text 

590 session.add(message) 

591 session.flush() 

592 

593 if host_request.surfer_user_id == context.user_id: 

594 host_request.surfer_last_seen_message_id = message.id 

595 

596 notify( 

597 session, 

598 user_id=host_request.host_user_id, 

599 topic_action="host_request:message", 

600 key=host_request.conversation_id, 

601 data=notification_data_pb2.HostRequestMessage( 

602 host_request=host_request_to_pb(host_request, session, context), 

603 user=user_model_to_pb(host_request.surfer, session, context), 

604 text=request.text, 

605 am_host=True, 

606 ), 

607 ) 

608 

609 else: 

610 host_request.host_last_seen_message_id = message.id 

611 

612 notify( 

613 session, 

614 user_id=host_request.surfer_user_id, 

615 topic_action="host_request:message", 

616 key=host_request.conversation_id, 

617 data=notification_data_pb2.HostRequestMessage( 

618 host_request=host_request_to_pb(host_request, session, context), 

619 user=user_model_to_pb(host_request.host, session, context), 

620 text=request.text, 

621 am_host=False, 

622 ), 

623 ) 

624 

625 session.commit() 

626 

627 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

628 sent_messages_counter.labels(user_gender, "host request").inc() 

629 

630 return empty_pb2.Empty() 

631 

632 def GetHostRequestUpdates(self, request, context, session): 

633 if request.only_sent and request.only_received: 

634 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received") 

635 

636 if request.newest_message_id == 0: 

637 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

638 

639 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 

640 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message") 

641 

642 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

643 pagination = min(pagination, MAX_PAGE_SIZE) 

644 

645 statement = ( 

646 select( 

647 Message, 

648 HostRequest.status.label("host_request_status"), 

649 HostRequest.conversation_id.label("host_request_id"), 

650 ) 

651 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

652 .where(Message.id > request.newest_message_id) 

653 ) 

654 

655 if request.only_sent: 

656 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

657 elif request.only_received: 

658 statement = statement.where(HostRequest.host_user_id == context.user_id) 

659 else: 

660 statement = statement.where( 

661 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

662 ) 

663 

664 statement = statement.order_by(Message.id.asc()).limit(pagination + 1) 

665 res = session.execute(statement).all() 

666 

667 no_more = len(res) <= pagination 

668 

669 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO 

670 

671 return requests_pb2.GetHostRequestUpdatesRes( 

672 no_more=no_more, 

673 updates=[ 

674 requests_pb2.HostRequestUpdate( 

675 host_request_id=result.host_request_id, 

676 status=hostrequeststatus2api[result.host_request_status], 

677 message=message_to_pb(result.Message), 

678 ) 

679 for result in res[:pagination] 

680 ], 

681 ) 

682 

683 def MarkLastSeenHostRequest(self, request, context, session): 

684 host_request = session.execute( 

685 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

686 ).scalar_one_or_none() 

687 

688 if not host_request: 

689 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

690 

691 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

692 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

693 

694 if host_request.surfer_user_id == context.user_id: 

695 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id: 

696 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

697 host_request.surfer_last_seen_message_id = request.last_seen_message_id 

698 else: 

699 if not host_request.host_last_seen_message_id <= request.last_seen_message_id: 

700 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages") 

701 host_request.host_last_seen_message_id = request.last_seen_message_id 

702 

703 session.commit() 

704 return empty_pb2.Empty() 

705 

706 def SetHostRequestArchiveStatus(self, request, context, session): 

707 host_request: HostRequest = session.execute( 

708 select(HostRequest) 

709 .where(HostRequest.conversation_id == request.host_request_id) 

710 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

711 ).scalar_one_or_none() 

712 

713 if not host_request: 

714 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

715 

716 if context.user_id == host_request.surfer_user_id: 

717 host_request.is_surfer_archived = request.is_archived 

718 else: 

719 host_request.is_host_archived = request.is_archived 

720 

721 return requests_pb2.SetHostRequestArchiveStatusRes( 

722 host_request_id=host_request.conversation_id, 

723 is_archived=request.is_archived, 

724 ) 

725 

726 def GetResponseRate(self, request, context, session): 

727 user_res = session.execute( 

728 select(User.id, UserResponseRate) 

729 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id) 

730 .where_users_visible(context) 

731 .where(User.id == request.user_id) 

732 ).one_or_none() 

733 

734 # if user doesn't exist, return None 

735 if not user_res: 

736 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

737 

738 user, response_rates = user_res 

739 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates)) 

740 

741 def SendHostRequestFeedback(self, request, context, session): 

742 host_request = session.execute( 

743 select(HostRequest) 

744 .where(HostRequest.conversation_id == request.host_request_id) 

745 .where(HostRequest.host_user_id == context.user_id) 

746 ).scalar_one_or_none() 

747 

748 if not host_request: 

749 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found") 

750 

751 feedback = session.execute( 

752 select(HostRequestFeedback) 

753 .where(HostRequestFeedback.host_request_id == host_request.conversation_id) 

754 .where(HostRequestFeedback.from_user_id == context.user_id) 

755 ).scalar_one_or_none() 

756 

757 if feedback: 

758 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback") 

759 

760 session.add( 

761 HostRequestFeedback( 

762 host_request_id=host_request.conversation_id, 

763 from_user_id=host_request.host_user_id, 

764 to_user_id=host_request.surfer_user_id, 

765 request_quality=hostrequestquality2sql.get(request.host_request_quality), 

766 decline_reason=request.decline_reason, 

767 ) 

768 ) 

769 

770 return empty_pb2.Empty()