Coverage for src/couchers/servicers/requests.py: 92%

251 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-07-22 17:19 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import Float 

7from sqlalchemy.orm import aliased 

8from sqlalchemy.sql import and_, func, or_ 

9from sqlalchemy.sql.functions import percentile_disc 

10 

11from couchers import errors 

12from couchers.db import session_scope 

13from couchers.models import Conversation, HostRequest, HostRequestStatus, Message, MessageType, User 

14from couchers.notifications.notify import notify 

15from couchers.servicers.api import user_model_to_pb 

16from couchers.sql import couchers_select as select 

17from couchers.utils import ( 

18 Duration_from_timedelta, 

19 Timestamp_from_datetime, 

20 date_to_api, 

21 now, 

22 parse_date, 

23 today_in_timezone, 

24) 

25from proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc 

26 

27logger = logging.getLogger(__name__) 

28 

29DEFAULT_PAGINATION_LENGTH = 10 

30MAX_PAGE_SIZE = 50 

31 

32 

33hostrequeststatus2api = { 

34 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING, 

35 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

36 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED, 

37 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED, 

38 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED, 

39} 

40 

41 

42def message_to_pb(message: Message): 

43 """ 

44 Turns the given message to a protocol buffer 

45 """ 

46 if message.is_normal_message: 

47 return conversations_pb2.Message( 

48 message_id=message.id, 

49 author_user_id=message.author_id, 

50 time=Timestamp_from_datetime(message.time), 

51 text=conversations_pb2.MessageContentText(text=message.text), 

52 ) 

53 else: 

54 return conversations_pb2.Message( 

55 message_id=message.id, 

56 author_user_id=message.author_id, 

57 time=Timestamp_from_datetime(message.time), 

58 chat_created=( 

59 conversations_pb2.MessageContentChatCreated() 

60 if message.message_type == MessageType.chat_created 

61 else None 

62 ), 

63 host_request_status_changed=( 

64 conversations_pb2.MessageContentHostRequestStatusChanged( 

65 status=hostrequeststatus2api[message.host_request_status_target] 

66 ) 

67 if message.message_type == MessageType.host_request_status_changed 

68 else None 

69 ), 

70 ) 

71 

72 

73def host_request_to_pb(host_request: HostRequest, session, context): 

74 initial_message = session.execute( 

75 select(Message) 

76 .where(Message.conversation_id == host_request.conversation_id) 

77 .order_by(Message.id.asc()) 

78 .limit(1) 

79 ).scalar_one() 

80 

81 latest_message = session.execute( 

82 select(Message) 

83 .where(Message.conversation_id == host_request.conversation_id) 

84 .order_by(Message.id.desc()) 

85 .limit(1) 

86 ).scalar_one() 

87 

88 return requests_pb2.HostRequest( 

89 host_request_id=host_request.conversation_id, 

90 surfer_user_id=host_request.surfer_user_id, 

91 host_user_id=host_request.host_user_id, 

92 status=hostrequeststatus2api[host_request.status], 

93 created=Timestamp_from_datetime(initial_message.time), 

94 from_date=date_to_api(host_request.from_date), 

95 to_date=date_to_api(host_request.to_date), 

96 last_seen_message_id=( 

97 host_request.surfer_last_seen_message_id 

98 if context.user_id == host_request.surfer_user_id 

99 else host_request.host_last_seen_message_id 

100 ), 

101 latest_message=message_to_pb(latest_message), 

102 ) 

103 

104 

105class Requests(requests_pb2_grpc.RequestsServicer): 

106 def CreateHostRequest(self, request, context): 

107 with session_scope() as session: 

108 if request.host_user_id == context.user_id: 

109 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_REQUEST_SELF) 

110 

111 # just to check host exists and is visible 

112 host = session.execute( 

113 select(User).where_users_visible(context).where(User.id == request.host_user_id) 

114 ).scalar_one_or_none() 

115 if not host: 

116 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

117 

118 from_date = parse_date(request.from_date) 

119 to_date = parse_date(request.to_date) 

120 

121 if not from_date or not to_date: 

122 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_DATE) 

123 

124 today = today_in_timezone(host.timezone) 

125 

126 # request starts from the past 

127 if from_date < today: 

128 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_BEFORE_TODAY) 

129 

130 # from_date is not >= to_date 

131 if from_date >= to_date: 

132 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_AFTER_TO) 

133 

134 # No need to check today > to_date 

135 

136 if from_date - today > timedelta(days=365): 

137 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_AFTER_ONE_YEAR) 

138 

139 if to_date - from_date > timedelta(days=365): 

140 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_TO_AFTER_ONE_YEAR) 

141 

142 conversation = Conversation() 

143 session.add(conversation) 

144 session.flush() 

145 

146 session.add( 

147 Message( 

148 conversation_id=conversation.id, 

149 author_id=context.user_id, 

150 message_type=MessageType.chat_created, 

151 ) 

152 ) 

153 

154 message = Message( 

155 conversation_id=conversation.id, 

156 author_id=context.user_id, 

157 text=request.text, 

158 message_type=MessageType.text, 

159 ) 

160 session.add(message) 

161 session.flush() 

162 

163 host_request = HostRequest( 

164 conversation_id=conversation.id, 

165 surfer_user_id=context.user_id, 

166 host_user_id=host.id, 

167 from_date=from_date, 

168 to_date=to_date, 

169 status=HostRequestStatus.pending, 

170 surfer_last_seen_message_id=message.id, 

171 # TODO: tz 

172 # timezone=host.timezone, 

173 ) 

174 session.add(host_request) 

175 session.commit() 

176 

177 notify( 

178 user_id=host_request.host_user_id, 

179 topic_action="host_request:create", 

180 key=host_request.conversation_id, 

181 data=notification_data_pb2.HostRequestCreate( 

182 host_request=host_request_to_pb(host_request, session, context), 

183 surfer=user_model_to_pb(host_request.surfer, session, context), 

184 text=request.text, 

185 ), 

186 ) 

187 

188 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id) 

189 

190 def GetHostRequest(self, request, context): 

191 with session_scope() as session: 

192 host_request = session.execute( 

193 select(HostRequest) 

194 .where_users_column_visible(context, HostRequest.surfer_user_id) 

195 .where_users_column_visible(context, HostRequest.host_user_id) 

196 .where(HostRequest.conversation_id == request.host_request_id) 

197 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

198 ).scalar_one_or_none() 

199 

200 if not host_request: 

201 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

202 

203 return host_request_to_pb(host_request, session, context) 

204 

205 def ListHostRequests(self, request, context): 

206 if request.only_sent and request.only_received: 

207 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_SENT_OR_RECEIVED) 

208 

209 with session_scope() as session: 

210 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

211 pagination = min(pagination, MAX_PAGE_SIZE) 

212 

213 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have 

214 # none as message_2.id. So just filter for these ones to get highest messages only. 

215 # See https://stackoverflow.com/a/27802817/6115336 

216 message_2 = aliased(Message) 

217 statement = ( 

218 select(Message, HostRequest, Conversation) 

219 .outerjoin( 

220 message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id) 

221 ) 

222 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

223 .join(Conversation, Conversation.id == HostRequest.conversation_id) 

224 .where_users_column_visible(context, HostRequest.surfer_user_id) 

225 .where_users_column_visible(context, HostRequest.host_user_id) 

226 .where(message_2.id == None) 

227 .where(or_(Message.id < request.last_request_id, request.last_request_id == 0)) 

228 ) 

229 

230 if request.only_sent: 

231 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

232 elif request.only_received: 

233 statement = statement.where(HostRequest.host_user_id == context.user_id) 

234 else: 

235 statement = statement.where( 

236 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

237 ) 

238 

239 # TODO: I considered having the latest control message be the single source of truth for 

240 # the HostRequest.status, but decided against it because of this filter. 

241 # Another possibility is to filter in the python instead of SQL, but that's slower 

242 if request.only_active: 

243 statement = statement.where( 

244 or_( 

245 HostRequest.status == HostRequestStatus.pending, 

246 HostRequest.status == HostRequestStatus.accepted, 

247 HostRequest.status == HostRequestStatus.confirmed, 

248 ) 

249 ) 

250 statement = statement.where(HostRequest.end_time <= func.now()) 

251 

252 statement = statement.order_by(Message.id.desc()).limit(pagination + 1) 

253 results = session.execute(statement).all() 

254 

255 host_requests = [ 

256 requests_pb2.HostRequest( 

257 host_request_id=result.HostRequest.conversation_id, 

258 surfer_user_id=result.HostRequest.surfer_user_id, 

259 host_user_id=result.HostRequest.host_user_id, 

260 status=hostrequeststatus2api[result.HostRequest.status], 

261 created=Timestamp_from_datetime(result.Conversation.created), 

262 from_date=date_to_api(result.HostRequest.from_date), 

263 to_date=date_to_api(result.HostRequest.to_date), 

264 last_seen_message_id=( 

265 result.HostRequest.surfer_last_seen_message_id 

266 if context.user_id == result.HostRequest.surfer_user_id 

267 else result.HostRequest.host_last_seen_message_id 

268 ), 

269 latest_message=message_to_pb(result.Message), 

270 ) 

271 for result in results[:pagination] 

272 ] 

273 last_request_id = ( 

274 min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 

275 ) # TODO 

276 no_more = len(results) <= pagination 

277 

278 return requests_pb2.ListHostRequestsRes( 

279 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests 

280 ) 

281 

282 def RespondHostRequest(self, request, context): 

283 with session_scope() as session: 

284 host_request = session.execute( 

285 select(HostRequest) 

286 .where_users_column_visible(context, HostRequest.surfer_user_id) 

287 .where_users_column_visible(context, HostRequest.host_user_id) 

288 .where(HostRequest.conversation_id == request.host_request_id) 

289 ).scalar_one_or_none() 

290 

291 if not host_request: 

292 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

293 

294 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

295 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

296 

297 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING: 

298 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

299 

300 if host_request.end_time < now(): 

301 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_IN_PAST) 

302 

303 control_message = Message() 

304 

305 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED: 

306 # only host can accept 

307 if context.user_id != host_request.host_user_id: 

308 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.NOT_THE_HOST) 

309 # can't accept a cancelled or confirmed request (only reject), or already accepted 

310 if ( 

311 host_request.status == HostRequestStatus.cancelled 

312 or host_request.status == HostRequestStatus.confirmed 

313 or host_request.status == HostRequestStatus.accepted 

314 ): 

315 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

316 control_message.host_request_status_target = HostRequestStatus.accepted 

317 host_request.status = HostRequestStatus.accepted 

318 session.flush() 

319 

320 notify( 

321 user_id=host_request.surfer_user_id, 

322 topic_action="host_request:accept", 

323 key=host_request.conversation_id, 

324 data=notification_data_pb2.HostRequestAccept( 

325 host_request=host_request_to_pb(host_request, session, context), 

326 host=user_model_to_pb(host_request.host, session, context), 

327 ), 

328 ) 

329 

330 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED: 

331 # only host can reject 

332 if context.user_id != host_request.host_user_id: 

333 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

334 # can't reject a cancelled or already rejected request 

335 if ( 

336 host_request.status == HostRequestStatus.cancelled 

337 or host_request.status == HostRequestStatus.rejected 

338 ): 

339 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

340 control_message.host_request_status_target = HostRequestStatus.rejected 

341 host_request.status = HostRequestStatus.rejected 

342 session.flush() 

343 

344 notify( 

345 user_id=host_request.surfer_user_id, 

346 topic_action="host_request:reject", 

347 key=host_request.conversation_id, 

348 data=notification_data_pb2.HostRequestReject( 

349 host_request=host_request_to_pb(host_request, session, context), 

350 host=user_model_to_pb(host_request.host, session, context), 

351 ), 

352 ) 

353 

354 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED: 

355 # only surfer can confirm 

356 if context.user_id != host_request.surfer_user_id: 

357 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

358 # can only confirm an accepted request 

359 if host_request.status != HostRequestStatus.accepted: 

360 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

361 control_message.host_request_status_target = HostRequestStatus.confirmed 

362 host_request.status = HostRequestStatus.confirmed 

363 session.flush() 

364 

365 notify( 

366 user_id=host_request.host_user_id, 

367 topic_action="host_request:confirm", 

368 key=host_request.conversation_id, 

369 data=notification_data_pb2.HostRequestConfirm( 

370 host_request=host_request_to_pb(host_request, session, context), 

371 surfer=user_model_to_pb(host_request.surfer, session, context), 

372 ), 

373 ) 

374 

375 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED: 

376 # only surfer can cancel 

377 if context.user_id != host_request.surfer_user_id: 

378 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

379 # can't' cancel an already cancelled or rejected request 

380 if ( 

381 host_request.status == HostRequestStatus.rejected 

382 or host_request.status == HostRequestStatus.cancelled 

383 ): 

384 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

385 control_message.host_request_status_target = HostRequestStatus.cancelled 

386 host_request.status = HostRequestStatus.cancelled 

387 session.flush() 

388 

389 notify( 

390 user_id=host_request.host_user_id, 

391 topic_action="host_request:cancel", 

392 key=host_request.conversation_id, 

393 data=notification_data_pb2.HostRequestCancel( 

394 host_request=host_request_to_pb(host_request, session, context), 

395 surfer=user_model_to_pb(host_request.surfer, session, context), 

396 ), 

397 ) 

398 

399 control_message.message_type = MessageType.host_request_status_changed 

400 control_message.conversation_id = host_request.conversation_id 

401 control_message.author_id = context.user_id 

402 session.add(control_message) 

403 

404 if request.text: 

405 latest_message = Message() 

406 latest_message.conversation_id = host_request.conversation_id 

407 latest_message.text = request.text 

408 latest_message.author_id = context.user_id 

409 latest_message.message_type = MessageType.text 

410 session.add(latest_message) 

411 else: 

412 latest_message = control_message 

413 

414 session.flush() 

415 

416 if host_request.surfer_user_id == context.user_id: 

417 host_request.surfer_last_seen_message_id = latest_message.id 

418 else: 

419 host_request.host_last_seen_message_id = latest_message.id 

420 session.commit() 

421 

422 return empty_pb2.Empty() 

423 

424 def GetHostRequestMessages(self, request, context): 

425 with session_scope() as session: 

426 host_request = session.execute( 

427 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

428 ).scalar_one_or_none() 

429 

430 if not host_request: 

431 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

432 

433 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

434 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

435 

436 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

437 pagination = min(pagination, MAX_PAGE_SIZE) 

438 

439 messages = ( 

440 session.execute( 

441 select(Message) 

442 .where(Message.conversation_id == host_request.conversation_id) 

443 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

444 .order_by(Message.id.desc()) 

445 .limit(pagination + 1) 

446 ) 

447 .scalars() 

448 .all() 

449 ) 

450 

451 no_more = len(messages) <= pagination 

452 

453 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0 

454 

455 return requests_pb2.GetHostRequestMessagesRes( 

456 last_message_id=last_message_id, 

457 no_more=no_more, 

458 messages=[message_to_pb(message) for message in messages[:pagination]], 

459 ) 

460 

461 def SendHostRequestMessage(self, request, context): 

462 if request.text == "": 

463 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

464 with session_scope() as session: 

465 host_request = session.execute( 

466 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

467 ).scalar_one_or_none() 

468 

469 if not host_request: 

470 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

471 

472 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

473 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

474 

475 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 

476 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.HOST_REQUEST_CLOSED) 

477 

478 if host_request.end_time < now(): 

479 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_IN_PAST) 

480 

481 message = Message() 

482 message.conversation_id = host_request.conversation_id 

483 message.author_id = context.user_id 

484 message.message_type = MessageType.text 

485 message.text = request.text 

486 session.add(message) 

487 session.flush() 

488 

489 if host_request.surfer_user_id == context.user_id: 

490 host_request.surfer_last_seen_message_id = message.id 

491 

492 notify( 

493 user_id=host_request.host_user_id, 

494 topic_action="host_request:message", 

495 key=host_request.conversation_id, 

496 data=notification_data_pb2.HostRequestMessage( 

497 host_request=host_request_to_pb(host_request, session, context), 

498 user=user_model_to_pb(host_request.surfer, session, context), 

499 text=request.text, 

500 am_host=True, 

501 ), 

502 ) 

503 

504 else: 

505 host_request.host_last_seen_message_id = message.id 

506 

507 notify( 

508 user_id=host_request.surfer_user_id, 

509 topic_action="host_request:message", 

510 key=host_request.conversation_id, 

511 data=notification_data_pb2.HostRequestMessage( 

512 host_request=host_request_to_pb(host_request, session, context), 

513 user=user_model_to_pb(host_request.host, session, context), 

514 text=request.text, 

515 am_host=False, 

516 ), 

517 ) 

518 

519 session.commit() 

520 

521 return empty_pb2.Empty() 

522 

523 def GetHostRequestUpdates(self, request, context): 

524 if request.only_sent and request.only_received: 

525 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_SENT_OR_RECEIVED) 

526 

527 with session_scope() as session: 

528 if request.newest_message_id == 0: 

529 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

530 

531 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 

532 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

533 

534 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

535 pagination = min(pagination, MAX_PAGE_SIZE) 

536 

537 statement = ( 

538 select( 

539 Message, 

540 HostRequest.status.label("host_request_status"), 

541 HostRequest.conversation_id.label("host_request_id"), 

542 ) 

543 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

544 .where(Message.id > request.newest_message_id) 

545 ) 

546 

547 if request.only_sent: 

548 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

549 elif request.only_received: 

550 statement = statement.where(HostRequest.host_user_id == context.user_id) 

551 else: 

552 statement = statement.where( 

553 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

554 ) 

555 

556 statement = statement.order_by(Message.id.asc()).limit(pagination + 1) 

557 res = session.execute(statement).all() 

558 

559 no_more = len(res) <= pagination 

560 

561 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO 

562 

563 return requests_pb2.GetHostRequestUpdatesRes( 

564 no_more=no_more, 

565 updates=[ 

566 requests_pb2.HostRequestUpdate( 

567 host_request_id=result.host_request_id, 

568 status=hostrequeststatus2api[result.host_request_status], 

569 message=message_to_pb(result.Message), 

570 ) 

571 for result in res[:pagination] 

572 ], 

573 ) 

574 

575 def MarkLastSeenHostRequest(self, request, context): 

576 with session_scope() as session: 

577 host_request = session.execute( 

578 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

579 ).scalar_one_or_none() 

580 

581 if not host_request: 

582 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

583 

584 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

585 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

586 

587 if host_request.surfer_user_id == context.user_id: 

588 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id: 

589 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES) 

590 host_request.surfer_last_seen_message_id = request.last_seen_message_id 

591 else: 

592 if not host_request.host_last_seen_message_id <= request.last_seen_message_id: 

593 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES) 

594 host_request.host_last_seen_message_id = request.last_seen_message_id 

595 

596 session.commit() 

597 return empty_pb2.Empty() 

598 

599 def GetResponseRate(self, request, context): 

600 with session_scope() as session: 

601 # this subquery gets the time that the request was sent 

602 t = ( 

603 select(Message.conversation_id, Message.time) 

604 .where(Message.message_type == MessageType.chat_created) 

605 .subquery() 

606 ) 

607 # this subquery gets the time that the user responded to the request 

608 s = ( 

609 select(Message.conversation_id, func.min(Message.time).label("time")) 

610 .where(Message.author_id == request.user_id) 

611 .group_by(Message.conversation_id) 

612 .subquery() 

613 ) 

614 

615 res = session.execute( 

616 select( 

617 User.id, 

618 # number of requests received 

619 func.count().label("n"), 

620 # percentage of requests responded to 

621 (func.count(s.c.time) / func.cast(func.greatest(func.count(t.c.time), 1.0), Float)).label( 

622 "response_rate" 

623 ), 

624 # the 33rd percentile response time 

625 percentile_disc(0.33) 

626 .within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))) 

627 .label("response_time_p33"), 

628 # the 66th percentile response time 

629 percentile_disc(0.66) 

630 .within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))) 

631 .label("response_time_p66"), 

632 ) 

633 .where_users_visible(context) 

634 .where(User.id == request.user_id) 

635 .outerjoin(HostRequest, HostRequest.host_user_id == User.id) 

636 .outerjoin(t, t.c.conversation_id == HostRequest.conversation_id) 

637 .outerjoin(s, s.c.conversation_id == HostRequest.conversation_id) 

638 .group_by(User.id) 

639 ).one_or_none() 

640 

641 if not res: 

642 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

643 

644 _, n, response_rate, response_time_p33, response_time_p66 = res 

645 

646 if n < 3: 

647 return requests_pb2.GetResponseRateRes( 

648 insufficient_data=requests_pb2.ResponseRateInsufficientData(), 

649 ) 

650 

651 if response_rate <= 0.33: 

652 return requests_pb2.GetResponseRateRes( 

653 low=requests_pb2.ResponseRateLow(), 

654 ) 

655 

656 response_time_p33_coarsened = Duration_from_timedelta( 

657 timedelta(seconds=round(response_time_p33.total_seconds() / 60) * 60) 

658 ) 

659 

660 if response_rate <= 0.66: 

661 return requests_pb2.GetResponseRateRes( 

662 some=requests_pb2.ResponseRateSome(response_time_p33=response_time_p33_coarsened), 

663 ) 

664 

665 response_time_p66_coarsened = Duration_from_timedelta( 

666 timedelta(seconds=round(response_time_p66.total_seconds() / 60) * 60) 

667 ) 

668 

669 if response_rate <= 0.90: 

670 return requests_pb2.GetResponseRateRes( 

671 most=requests_pb2.ResponseRateMost( 

672 response_time_p33=response_time_p33_coarsened, response_time_p66=response_time_p66_coarsened 

673 ), 

674 ) 

675 else: 

676 return requests_pb2.GetResponseRateRes( 

677 almost_all=requests_pb2.ResponseRateAlmostAll( 

678 response_time_p33=response_time_p33_coarsened, response_time_p66=response_time_p66_coarsened 

679 ), 

680 )