Coverage for src/couchers/servicers/requests.py: 93%

255 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-04-16 15:13 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.orm import aliased 

7from sqlalchemy.sql import and_, func, or_ 

8 

9from couchers import errors 

10from couchers.materialized_views import user_response_rates 

11from couchers.metrics import ( 

12 account_age_on_host_request_create_histogram, 

13 host_request_first_response_histogram, 

14 host_request_responses_counter, 

15 host_requests_sent_counter, 

16 sent_messages_counter, 

17) 

18from couchers.models import Conversation, HostRequest, HostRequestStatus, Message, MessageType, User 

19from couchers.notifications.notify import notify 

20from couchers.servicers.api import response_rate_to_pb, user_model_to_pb 

21from couchers.sql import couchers_select as select 

22from couchers.utils import ( 

23 Timestamp_from_datetime, 

24 date_to_api, 

25 now, 

26 parse_date, 

27 today_in_timezone, 

28) 

29from proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc 

30 

31logger = logging.getLogger(__name__) 

32 

33DEFAULT_PAGINATION_LENGTH = 10 

34MAX_PAGE_SIZE = 50 

35 

36 

37hostrequeststatus2api = { 

38 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING, 

39 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

40 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED, 

41 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED, 

42 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED, 

43} 

44 

45 

46def message_to_pb(message: Message): 

47 """ 

48 Turns the given message to a protocol buffer 

49 """ 

50 if message.is_normal_message: 

51 return conversations_pb2.Message( 

52 message_id=message.id, 

53 author_user_id=message.author_id, 

54 time=Timestamp_from_datetime(message.time), 

55 text=conversations_pb2.MessageContentText(text=message.text), 

56 ) 

57 else: 

58 return conversations_pb2.Message( 

59 message_id=message.id, 

60 author_user_id=message.author_id, 

61 time=Timestamp_from_datetime(message.time), 

62 chat_created=( 

63 conversations_pb2.MessageContentChatCreated() 

64 if message.message_type == MessageType.chat_created 

65 else None 

66 ), 

67 host_request_status_changed=( 

68 conversations_pb2.MessageContentHostRequestStatusChanged( 

69 status=hostrequeststatus2api[message.host_request_status_target] 

70 ) 

71 if message.message_type == MessageType.host_request_status_changed 

72 else None 

73 ), 

74 ) 

75 

76 

77def host_request_to_pb(host_request: HostRequest, session, context): 

78 initial_message = session.execute( 

79 select(Message) 

80 .where(Message.conversation_id == host_request.conversation_id) 

81 .order_by(Message.id.asc()) 

82 .limit(1) 

83 ).scalar_one() 

84 

85 latest_message = session.execute( 

86 select(Message) 

87 .where(Message.conversation_id == host_request.conversation_id) 

88 .order_by(Message.id.desc()) 

89 .limit(1) 

90 ).scalar_one() 

91 

92 return requests_pb2.HostRequest( 

93 host_request_id=host_request.conversation_id, 

94 surfer_user_id=host_request.surfer_user_id, 

95 host_user_id=host_request.host_user_id, 

96 status=hostrequeststatus2api[host_request.status], 

97 created=Timestamp_from_datetime(initial_message.time), 

98 from_date=date_to_api(host_request.from_date), 

99 to_date=date_to_api(host_request.to_date), 

100 last_seen_message_id=( 

101 host_request.surfer_last_seen_message_id 

102 if context.user_id == host_request.surfer_user_id 

103 else host_request.host_last_seen_message_id 

104 ), 

105 latest_message=message_to_pb(latest_message), 

106 ) 

107 

108 

109def _possibly_observe_first_response_time(session, host_request, user_id, response_type): 

110 # if this is the first response then there's nothing by this user yet 

111 assert host_request.host_user_id == user_id 

112 

113 number_messages_by_host = session.execute( 

114 select(func.count()) 

115 .where(Message.conversation_id == host_request.conversation_id) 

116 .where(Message.author_id == user_id) 

117 ).scalar_one_or_none() 

118 

119 if number_messages_by_host == 0: 

120 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one() 

121 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one() 

122 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe( 

123 (now() - host_request.conversation.created).total_seconds() 

124 ) 

125 

126 

127class Requests(requests_pb2_grpc.RequestsServicer): 

128 def CreateHostRequest(self, request, context, session): 

129 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

130 if not user.has_completed_profile: 

131 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_REQUEST) 

132 

133 if request.host_user_id == context.user_id: 

134 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_REQUEST_SELF) 

135 

136 # just to check host exists and is visible 

137 host = session.execute( 

138 select(User).where_users_visible(context).where(User.id == request.host_user_id) 

139 ).scalar_one_or_none() 

140 if not host: 

141 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

142 

143 from_date = parse_date(request.from_date) 

144 to_date = parse_date(request.to_date) 

145 

146 if not from_date or not to_date: 

147 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_DATE) 

148 

149 today = today_in_timezone(host.timezone) 

150 

151 # request starts from the past 

152 if from_date < today: 

153 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_BEFORE_TODAY) 

154 

155 # from_date is not >= to_date 

156 if from_date >= to_date: 

157 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_AFTER_TO) 

158 

159 # No need to check today > to_date 

160 

161 if from_date - today > timedelta(days=365): 

162 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_AFTER_ONE_YEAR) 

163 

164 if to_date - from_date > timedelta(days=365): 

165 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_TO_AFTER_ONE_YEAR) 

166 

167 conversation = Conversation() 

168 session.add(conversation) 

169 session.flush() 

170 

171 session.add( 

172 Message( 

173 conversation_id=conversation.id, 

174 author_id=context.user_id, 

175 message_type=MessageType.chat_created, 

176 ) 

177 ) 

178 

179 message = Message( 

180 conversation_id=conversation.id, 

181 author_id=context.user_id, 

182 text=request.text, 

183 message_type=MessageType.text, 

184 ) 

185 session.add(message) 

186 session.flush() 

187 

188 host_request = HostRequest( 

189 conversation_id=conversation.id, 

190 surfer_user_id=context.user_id, 

191 host_user_id=host.id, 

192 from_date=from_date, 

193 to_date=to_date, 

194 status=HostRequestStatus.pending, 

195 surfer_last_seen_message_id=message.id, 

196 # TODO: tz 

197 # timezone=host.timezone, 

198 ) 

199 session.add(host_request) 

200 session.commit() 

201 

202 notify( 

203 session, 

204 user_id=host_request.host_user_id, 

205 topic_action="host_request:create", 

206 key=host_request.conversation_id, 

207 data=notification_data_pb2.HostRequestCreate( 

208 host_request=host_request_to_pb(host_request, session, context), 

209 surfer=user_model_to_pb(host_request.surfer, session, context), 

210 text=request.text, 

211 ), 

212 ) 

213 

214 host_requests_sent_counter.labels(user.gender, host.gender).inc() 

215 sent_messages_counter.labels(user.gender, "host request send").inc() 

216 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe( 

217 (now() - user.joined).total_seconds() 

218 ) 

219 

220 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id) 

221 

222 def GetHostRequest(self, request, context, session): 

223 host_request = session.execute( 

224 select(HostRequest) 

225 .where_users_column_visible(context, HostRequest.surfer_user_id) 

226 .where_users_column_visible(context, HostRequest.host_user_id) 

227 .where(HostRequest.conversation_id == request.host_request_id) 

228 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

229 ).scalar_one_or_none() 

230 

231 if not host_request: 

232 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

233 

234 return host_request_to_pb(host_request, session, context) 

235 

236 def ListHostRequests(self, request, context, session): 

237 if request.only_sent and request.only_received: 

238 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_SENT_OR_RECEIVED) 

239 

240 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

241 pagination = min(pagination, MAX_PAGE_SIZE) 

242 

243 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have 

244 # none as message_2.id. So just filter for these ones to get highest messages only. 

245 # See https://stackoverflow.com/a/27802817/6115336 

246 message_2 = aliased(Message) 

247 statement = ( 

248 select(Message, HostRequest, Conversation) 

249 .outerjoin(message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id)) 

250 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

251 .join(Conversation, Conversation.id == HostRequest.conversation_id) 

252 .where_users_column_visible(context, HostRequest.surfer_user_id) 

253 .where_users_column_visible(context, HostRequest.host_user_id) 

254 .where(message_2.id == None) 

255 .where(or_(Message.id < request.last_request_id, request.last_request_id == 0)) 

256 ) 

257 

258 if request.only_sent: 

259 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

260 elif request.only_received: 

261 statement = statement.where(HostRequest.host_user_id == context.user_id) 

262 else: 

263 statement = statement.where( 

264 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

265 ) 

266 

267 # TODO: I considered having the latest control message be the single source of truth for 

268 # the HostRequest.status, but decided against it because of this filter. 

269 # Another possibility is to filter in the python instead of SQL, but that's slower 

270 if request.only_active: 

271 statement = statement.where( 

272 or_( 

273 HostRequest.status == HostRequestStatus.pending, 

274 HostRequest.status == HostRequestStatus.accepted, 

275 HostRequest.status == HostRequestStatus.confirmed, 

276 ) 

277 ) 

278 statement = statement.where(HostRequest.end_time <= func.now()) 

279 

280 statement = statement.order_by(Message.id.desc()).limit(pagination + 1) 

281 results = session.execute(statement).all() 

282 

283 host_requests = [ 

284 requests_pb2.HostRequest( 

285 host_request_id=result.HostRequest.conversation_id, 

286 surfer_user_id=result.HostRequest.surfer_user_id, 

287 host_user_id=result.HostRequest.host_user_id, 

288 status=hostrequeststatus2api[result.HostRequest.status], 

289 created=Timestamp_from_datetime(result.Conversation.created), 

290 from_date=date_to_api(result.HostRequest.from_date), 

291 to_date=date_to_api(result.HostRequest.to_date), 

292 last_seen_message_id=( 

293 result.HostRequest.surfer_last_seen_message_id 

294 if context.user_id == result.HostRequest.surfer_user_id 

295 else result.HostRequest.host_last_seen_message_id 

296 ), 

297 latest_message=message_to_pb(result.Message), 

298 ) 

299 for result in results[:pagination] 

300 ] 

301 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO 

302 no_more = len(results) <= pagination 

303 

304 return requests_pb2.ListHostRequestsRes( 

305 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests 

306 ) 

307 

308 def RespondHostRequest(self, request, context, session): 

309 def count_host_response(other_user_id, response_type): 

310 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

311 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one() 

312 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc() 

313 sent_messages_counter.labels(user_gender, "host request response").inc() 

314 

315 host_request = session.execute( 

316 select(HostRequest) 

317 .where_users_column_visible(context, HostRequest.surfer_user_id) 

318 .where_users_column_visible(context, HostRequest.host_user_id) 

319 .where(HostRequest.conversation_id == request.host_request_id) 

320 ).scalar_one_or_none() 

321 

322 if not host_request: 

323 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

324 

325 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

326 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

327 

328 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING: 

329 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

330 

331 if host_request.end_time < now(): 

332 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_IN_PAST) 

333 

334 control_message = Message() 

335 

336 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED: 

337 # only host can accept 

338 if context.user_id != host_request.host_user_id: 

339 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.NOT_THE_HOST) 

340 # can't accept a cancelled or confirmed request (only reject), or already accepted 

341 if ( 

342 host_request.status == HostRequestStatus.cancelled 

343 or host_request.status == HostRequestStatus.confirmed 

344 or host_request.status == HostRequestStatus.accepted 

345 ): 

346 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

347 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted") 

348 control_message.host_request_status_target = HostRequestStatus.accepted 

349 host_request.status = HostRequestStatus.accepted 

350 session.flush() 

351 

352 notify( 

353 session, 

354 user_id=host_request.surfer_user_id, 

355 topic_action="host_request:accept", 

356 key=host_request.conversation_id, 

357 data=notification_data_pb2.HostRequestAccept( 

358 host_request=host_request_to_pb(host_request, session, context), 

359 host=user_model_to_pb(host_request.host, session, context), 

360 ), 

361 ) 

362 

363 count_host_response(host_request.surfer_user_id, "accepted") 

364 

365 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED: 

366 # only host can reject 

367 if context.user_id != host_request.host_user_id: 

368 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

369 # can't reject a cancelled or already rejected request 

370 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 

371 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

372 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected") 

373 control_message.host_request_status_target = HostRequestStatus.rejected 

374 host_request.status = HostRequestStatus.rejected 

375 session.flush() 

376 

377 notify( 

378 session, 

379 user_id=host_request.surfer_user_id, 

380 topic_action="host_request:reject", 

381 key=host_request.conversation_id, 

382 data=notification_data_pb2.HostRequestReject( 

383 host_request=host_request_to_pb(host_request, session, context), 

384 host=user_model_to_pb(host_request.host, session, context), 

385 ), 

386 ) 

387 

388 count_host_response(host_request.surfer_user_id, "rejected") 

389 

390 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED: 

391 # only surfer can confirm 

392 if context.user_id != host_request.surfer_user_id: 

393 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

394 # can only confirm an accepted request 

395 if host_request.status != HostRequestStatus.accepted: 

396 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

397 control_message.host_request_status_target = HostRequestStatus.confirmed 

398 host_request.status = HostRequestStatus.confirmed 

399 session.flush() 

400 

401 notify( 

402 session, 

403 user_id=host_request.host_user_id, 

404 topic_action="host_request:confirm", 

405 key=host_request.conversation_id, 

406 data=notification_data_pb2.HostRequestConfirm( 

407 host_request=host_request_to_pb(host_request, session, context), 

408 surfer=user_model_to_pb(host_request.surfer, session, context), 

409 ), 

410 ) 

411 

412 count_host_response(host_request.host_user_id, "confirmed") 

413 

414 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED: 

415 # only surfer can cancel 

416 if context.user_id != host_request.surfer_user_id: 

417 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

418 # can't' cancel an already cancelled or rejected request 

419 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 

420 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

421 control_message.host_request_status_target = HostRequestStatus.cancelled 

422 host_request.status = HostRequestStatus.cancelled 

423 session.flush() 

424 

425 notify( 

426 session, 

427 user_id=host_request.host_user_id, 

428 topic_action="host_request:cancel", 

429 key=host_request.conversation_id, 

430 data=notification_data_pb2.HostRequestCancel( 

431 host_request=host_request_to_pb(host_request, session, context), 

432 surfer=user_model_to_pb(host_request.surfer, session, context), 

433 ), 

434 ) 

435 

436 count_host_response(host_request.host_user_id, "cancelled") 

437 

438 control_message.message_type = MessageType.host_request_status_changed 

439 control_message.conversation_id = host_request.conversation_id 

440 control_message.author_id = context.user_id 

441 session.add(control_message) 

442 

443 if request.text: 

444 latest_message = Message() 

445 latest_message.conversation_id = host_request.conversation_id 

446 latest_message.text = request.text 

447 latest_message.author_id = context.user_id 

448 latest_message.message_type = MessageType.text 

449 session.add(latest_message) 

450 else: 

451 latest_message = control_message 

452 

453 session.flush() 

454 

455 if host_request.surfer_user_id == context.user_id: 

456 host_request.surfer_last_seen_message_id = latest_message.id 

457 else: 

458 host_request.host_last_seen_message_id = latest_message.id 

459 session.commit() 

460 

461 return empty_pb2.Empty() 

462 

463 def GetHostRequestMessages(self, request, context, session): 

464 host_request = session.execute( 

465 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

466 ).scalar_one_or_none() 

467 

468 if not host_request: 

469 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

470 

471 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

472 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

473 

474 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

475 pagination = min(pagination, MAX_PAGE_SIZE) 

476 

477 messages = ( 

478 session.execute( 

479 select(Message) 

480 .where(Message.conversation_id == host_request.conversation_id) 

481 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

482 .order_by(Message.id.desc()) 

483 .limit(pagination + 1) 

484 ) 

485 .scalars() 

486 .all() 

487 ) 

488 

489 no_more = len(messages) <= pagination 

490 

491 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0 

492 

493 return requests_pb2.GetHostRequestMessagesRes( 

494 last_message_id=last_message_id, 

495 no_more=no_more, 

496 messages=[message_to_pb(message) for message in messages[:pagination]], 

497 ) 

498 

499 def SendHostRequestMessage(self, request, context, session): 

500 if request.text == "": 

501 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

502 host_request = session.execute( 

503 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

504 ).scalar_one_or_none() 

505 

506 if not host_request: 

507 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

508 

509 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

510 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

511 

512 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 

513 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.HOST_REQUEST_CLOSED) 

514 

515 if host_request.host_user_id == context.user_id: 

516 _possibly_observe_first_response_time(session, host_request, context.user_id, "message") 

517 

518 message = Message() 

519 message.conversation_id = host_request.conversation_id 

520 message.author_id = context.user_id 

521 message.message_type = MessageType.text 

522 message.text = request.text 

523 session.add(message) 

524 session.flush() 

525 

526 if host_request.surfer_user_id == context.user_id: 

527 host_request.surfer_last_seen_message_id = message.id 

528 

529 notify( 

530 session, 

531 user_id=host_request.host_user_id, 

532 topic_action="host_request:message", 

533 key=host_request.conversation_id, 

534 data=notification_data_pb2.HostRequestMessage( 

535 host_request=host_request_to_pb(host_request, session, context), 

536 user=user_model_to_pb(host_request.surfer, session, context), 

537 text=request.text, 

538 am_host=True, 

539 ), 

540 ) 

541 

542 else: 

543 host_request.host_last_seen_message_id = message.id 

544 

545 notify( 

546 session, 

547 user_id=host_request.surfer_user_id, 

548 topic_action="host_request:message", 

549 key=host_request.conversation_id, 

550 data=notification_data_pb2.HostRequestMessage( 

551 host_request=host_request_to_pb(host_request, session, context), 

552 user=user_model_to_pb(host_request.host, session, context), 

553 text=request.text, 

554 am_host=False, 

555 ), 

556 ) 

557 

558 session.commit() 

559 

560 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

561 sent_messages_counter.labels(user_gender, "host request").inc() 

562 

563 return empty_pb2.Empty() 

564 

565 def GetHostRequestUpdates(self, request, context, session): 

566 if request.only_sent and request.only_received: 

567 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_SENT_OR_RECEIVED) 

568 

569 if request.newest_message_id == 0: 

570 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

571 

572 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 

573 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

574 

575 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

576 pagination = min(pagination, MAX_PAGE_SIZE) 

577 

578 statement = ( 

579 select( 

580 Message, 

581 HostRequest.status.label("host_request_status"), 

582 HostRequest.conversation_id.label("host_request_id"), 

583 ) 

584 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

585 .where(Message.id > request.newest_message_id) 

586 ) 

587 

588 if request.only_sent: 

589 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

590 elif request.only_received: 

591 statement = statement.where(HostRequest.host_user_id == context.user_id) 

592 else: 

593 statement = statement.where( 

594 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

595 ) 

596 

597 statement = statement.order_by(Message.id.asc()).limit(pagination + 1) 

598 res = session.execute(statement).all() 

599 

600 no_more = len(res) <= pagination 

601 

602 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO 

603 

604 return requests_pb2.GetHostRequestUpdatesRes( 

605 no_more=no_more, 

606 updates=[ 

607 requests_pb2.HostRequestUpdate( 

608 host_request_id=result.host_request_id, 

609 status=hostrequeststatus2api[result.host_request_status], 

610 message=message_to_pb(result.Message), 

611 ) 

612 for result in res[:pagination] 

613 ], 

614 ) 

615 

616 def MarkLastSeenHostRequest(self, request, context, session): 

617 host_request = session.execute( 

618 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

619 ).scalar_one_or_none() 

620 

621 if not host_request: 

622 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

623 

624 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

625 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

626 

627 if host_request.surfer_user_id == context.user_id: 

628 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id: 

629 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES) 

630 host_request.surfer_last_seen_message_id = request.last_seen_message_id 

631 else: 

632 if not host_request.host_last_seen_message_id <= request.last_seen_message_id: 

633 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES) 

634 host_request.host_last_seen_message_id = request.last_seen_message_id 

635 

636 session.commit() 

637 return empty_pb2.Empty() 

638 

639 def GetResponseRate(self, request, context, session): 

640 user_res = session.execute( 

641 select(User.id, user_response_rates) 

642 .outerjoin(user_response_rates, user_response_rates.c.user_id == User.id) 

643 .where_users_visible(context) 

644 .where(User.id == request.user_id) 

645 ).one_or_none() 

646 

647 # if user doesn't exist, return None 

648 if not user_res: 

649 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

650 

651 user, *response_rates = user_res 

652 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates))