Coverage for src/couchers/servicers/requests.py: 93%

270 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-10-15 13:03 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import Float 

7from sqlalchemy.orm import aliased 

8from sqlalchemy.sql import and_, func, or_ 

9from sqlalchemy.sql.functions import percentile_disc 

10 

11from couchers import errors 

12from couchers.metrics import ( 

13 account_age_on_host_request_create_histogram, 

14 host_request_first_response_histogram, 

15 host_request_responses_counter, 

16 host_requests_sent_counter, 

17 sent_messages_counter, 

18) 

19from couchers.models import Conversation, HostRequest, HostRequestStatus, Message, MessageType, User 

20from couchers.notifications.notify import notify 

21from couchers.servicers.api import user_model_to_pb 

22from couchers.sql import couchers_select as select 

23from couchers.utils import ( 

24 Duration_from_timedelta, 

25 Timestamp_from_datetime, 

26 date_to_api, 

27 now, 

28 parse_date, 

29 today_in_timezone, 

30) 

31from proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc 

32 

33logger = logging.getLogger(__name__) 

34 

35DEFAULT_PAGINATION_LENGTH = 10 

36MAX_PAGE_SIZE = 50 

37 

38 

39hostrequeststatus2api = { 

40 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING, 

41 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

42 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED, 

43 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED, 

44 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED, 

45} 

46 

47 

48def message_to_pb(message: Message): 

49 """ 

50 Turns the given message to a protocol buffer 

51 """ 

52 if message.is_normal_message: 

53 return conversations_pb2.Message( 

54 message_id=message.id, 

55 author_user_id=message.author_id, 

56 time=Timestamp_from_datetime(message.time), 

57 text=conversations_pb2.MessageContentText(text=message.text), 

58 ) 

59 else: 

60 return conversations_pb2.Message( 

61 message_id=message.id, 

62 author_user_id=message.author_id, 

63 time=Timestamp_from_datetime(message.time), 

64 chat_created=( 

65 conversations_pb2.MessageContentChatCreated() 

66 if message.message_type == MessageType.chat_created 

67 else None 

68 ), 

69 host_request_status_changed=( 

70 conversations_pb2.MessageContentHostRequestStatusChanged( 

71 status=hostrequeststatus2api[message.host_request_status_target] 

72 ) 

73 if message.message_type == MessageType.host_request_status_changed 

74 else None 

75 ), 

76 ) 

77 

78 

79def host_request_to_pb(host_request: HostRequest, session, context): 

80 initial_message = session.execute( 

81 select(Message) 

82 .where(Message.conversation_id == host_request.conversation_id) 

83 .order_by(Message.id.asc()) 

84 .limit(1) 

85 ).scalar_one() 

86 

87 latest_message = session.execute( 

88 select(Message) 

89 .where(Message.conversation_id == host_request.conversation_id) 

90 .order_by(Message.id.desc()) 

91 .limit(1) 

92 ).scalar_one() 

93 

94 return requests_pb2.HostRequest( 

95 host_request_id=host_request.conversation_id, 

96 surfer_user_id=host_request.surfer_user_id, 

97 host_user_id=host_request.host_user_id, 

98 status=hostrequeststatus2api[host_request.status], 

99 created=Timestamp_from_datetime(initial_message.time), 

100 from_date=date_to_api(host_request.from_date), 

101 to_date=date_to_api(host_request.to_date), 

102 last_seen_message_id=( 

103 host_request.surfer_last_seen_message_id 

104 if context.user_id == host_request.surfer_user_id 

105 else host_request.host_last_seen_message_id 

106 ), 

107 latest_message=message_to_pb(latest_message), 

108 ) 

109 

110 

111def _possibly_observe_first_response_time(session, host_request, user_id, response_type): 

112 # if this is the first response then there's nothing by this user yet 

113 assert host_request.host_user_id == user_id 

114 

115 number_messages_by_host = session.execute( 

116 select(func.count()) 

117 .where(Message.conversation_id == host_request.conversation_id) 

118 .where(Message.author_id == user_id) 

119 ).scalar_one_or_none() 

120 

121 if number_messages_by_host == 0: 

122 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one() 

123 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one() 

124 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe( 

125 (now() - host_request.conversation.created).total_seconds() 

126 ) 

127 

128 

129class Requests(requests_pb2_grpc.RequestsServicer): 

130 def CreateHostRequest(self, request, context, session): 

131 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

132 if not user.has_completed_profile: 

133 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_REQUEST) 

134 

135 if request.host_user_id == context.user_id: 

136 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_REQUEST_SELF) 

137 

138 # just to check host exists and is visible 

139 host = session.execute( 

140 select(User).where_users_visible(context).where(User.id == request.host_user_id) 

141 ).scalar_one_or_none() 

142 if not host: 

143 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

144 

145 from_date = parse_date(request.from_date) 

146 to_date = parse_date(request.to_date) 

147 

148 if not from_date or not to_date: 

149 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_DATE) 

150 

151 today = today_in_timezone(host.timezone) 

152 

153 # request starts from the past 

154 if from_date < today: 

155 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_BEFORE_TODAY) 

156 

157 # from_date is not >= to_date 

158 if from_date >= to_date: 

159 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_AFTER_TO) 

160 

161 # No need to check today > to_date 

162 

163 if from_date - today > timedelta(days=365): 

164 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_AFTER_ONE_YEAR) 

165 

166 if to_date - from_date > timedelta(days=365): 

167 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_TO_AFTER_ONE_YEAR) 

168 

169 conversation = Conversation() 

170 session.add(conversation) 

171 session.flush() 

172 

173 session.add( 

174 Message( 

175 conversation_id=conversation.id, 

176 author_id=context.user_id, 

177 message_type=MessageType.chat_created, 

178 ) 

179 ) 

180 

181 message = Message( 

182 conversation_id=conversation.id, 

183 author_id=context.user_id, 

184 text=request.text, 

185 message_type=MessageType.text, 

186 ) 

187 session.add(message) 

188 session.flush() 

189 

190 host_request = HostRequest( 

191 conversation_id=conversation.id, 

192 surfer_user_id=context.user_id, 

193 host_user_id=host.id, 

194 from_date=from_date, 

195 to_date=to_date, 

196 status=HostRequestStatus.pending, 

197 surfer_last_seen_message_id=message.id, 

198 # TODO: tz 

199 # timezone=host.timezone, 

200 ) 

201 session.add(host_request) 

202 session.commit() 

203 

204 notify( 

205 session, 

206 user_id=host_request.host_user_id, 

207 topic_action="host_request:create", 

208 key=host_request.conversation_id, 

209 data=notification_data_pb2.HostRequestCreate( 

210 host_request=host_request_to_pb(host_request, session, context), 

211 surfer=user_model_to_pb(host_request.surfer, session, context), 

212 text=request.text, 

213 ), 

214 ) 

215 

216 host_requests_sent_counter.labels(user.gender, host.gender).inc() 

217 sent_messages_counter.labels(user.gender, "host request send").inc() 

218 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe( 

219 (now() - user.joined).total_seconds() 

220 ) 

221 

222 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id) 

223 

224 def GetHostRequest(self, request, context, session): 

225 host_request = session.execute( 

226 select(HostRequest) 

227 .where_users_column_visible(context, HostRequest.surfer_user_id) 

228 .where_users_column_visible(context, HostRequest.host_user_id) 

229 .where(HostRequest.conversation_id == request.host_request_id) 

230 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

231 ).scalar_one_or_none() 

232 

233 if not host_request: 

234 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

235 

236 return host_request_to_pb(host_request, session, context) 

237 

238 def ListHostRequests(self, request, context, session): 

239 if request.only_sent and request.only_received: 

240 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_SENT_OR_RECEIVED) 

241 

242 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

243 pagination = min(pagination, MAX_PAGE_SIZE) 

244 

245 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have 

246 # none as message_2.id. So just filter for these ones to get highest messages only. 

247 # See https://stackoverflow.com/a/27802817/6115336 

248 message_2 = aliased(Message) 

249 statement = ( 

250 select(Message, HostRequest, Conversation) 

251 .outerjoin(message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id)) 

252 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

253 .join(Conversation, Conversation.id == HostRequest.conversation_id) 

254 .where_users_column_visible(context, HostRequest.surfer_user_id) 

255 .where_users_column_visible(context, HostRequest.host_user_id) 

256 .where(message_2.id == None) 

257 .where(or_(Message.id < request.last_request_id, request.last_request_id == 0)) 

258 ) 

259 

260 if request.only_sent: 

261 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

262 elif request.only_received: 

263 statement = statement.where(HostRequest.host_user_id == context.user_id) 

264 else: 

265 statement = statement.where( 

266 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

267 ) 

268 

269 # TODO: I considered having the latest control message be the single source of truth for 

270 # the HostRequest.status, but decided against it because of this filter. 

271 # Another possibility is to filter in the python instead of SQL, but that's slower 

272 if request.only_active: 

273 statement = statement.where( 

274 or_( 

275 HostRequest.status == HostRequestStatus.pending, 

276 HostRequest.status == HostRequestStatus.accepted, 

277 HostRequest.status == HostRequestStatus.confirmed, 

278 ) 

279 ) 

280 statement = statement.where(HostRequest.end_time <= func.now()) 

281 

282 statement = statement.order_by(Message.id.desc()).limit(pagination + 1) 

283 results = session.execute(statement).all() 

284 

285 host_requests = [ 

286 requests_pb2.HostRequest( 

287 host_request_id=result.HostRequest.conversation_id, 

288 surfer_user_id=result.HostRequest.surfer_user_id, 

289 host_user_id=result.HostRequest.host_user_id, 

290 status=hostrequeststatus2api[result.HostRequest.status], 

291 created=Timestamp_from_datetime(result.Conversation.created), 

292 from_date=date_to_api(result.HostRequest.from_date), 

293 to_date=date_to_api(result.HostRequest.to_date), 

294 last_seen_message_id=( 

295 result.HostRequest.surfer_last_seen_message_id 

296 if context.user_id == result.HostRequest.surfer_user_id 

297 else result.HostRequest.host_last_seen_message_id 

298 ), 

299 latest_message=message_to_pb(result.Message), 

300 ) 

301 for result in results[:pagination] 

302 ] 

303 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO 

304 no_more = len(results) <= pagination 

305 

306 return requests_pb2.ListHostRequestsRes( 

307 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests 

308 ) 

309 

310 def RespondHostRequest(self, request, context, session): 

311 def count_host_response(other_user_id, response_type): 

312 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

313 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one() 

314 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc() 

315 sent_messages_counter.labels(user_gender, "host request response").inc() 

316 

317 host_request = session.execute( 

318 select(HostRequest) 

319 .where_users_column_visible(context, HostRequest.surfer_user_id) 

320 .where_users_column_visible(context, HostRequest.host_user_id) 

321 .where(HostRequest.conversation_id == request.host_request_id) 

322 ).scalar_one_or_none() 

323 

324 if not host_request: 

325 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

326 

327 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

328 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

329 

330 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING: 

331 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

332 

333 if host_request.end_time < now(): 

334 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_IN_PAST) 

335 

336 control_message = Message() 

337 

338 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED: 

339 # only host can accept 

340 if context.user_id != host_request.host_user_id: 

341 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.NOT_THE_HOST) 

342 # can't accept a cancelled or confirmed request (only reject), or already accepted 

343 if ( 

344 host_request.status == HostRequestStatus.cancelled 

345 or host_request.status == HostRequestStatus.confirmed 

346 or host_request.status == HostRequestStatus.accepted 

347 ): 

348 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

349 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted") 

350 control_message.host_request_status_target = HostRequestStatus.accepted 

351 host_request.status = HostRequestStatus.accepted 

352 session.flush() 

353 

354 notify( 

355 session, 

356 user_id=host_request.surfer_user_id, 

357 topic_action="host_request:accept", 

358 key=host_request.conversation_id, 

359 data=notification_data_pb2.HostRequestAccept( 

360 host_request=host_request_to_pb(host_request, session, context), 

361 host=user_model_to_pb(host_request.host, session, context), 

362 ), 

363 ) 

364 

365 count_host_response(host_request.surfer_user_id, "accepted") 

366 

367 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED: 

368 # only host can reject 

369 if context.user_id != host_request.host_user_id: 

370 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

371 # can't reject a cancelled or already rejected request 

372 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 

373 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

374 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected") 

375 control_message.host_request_status_target = HostRequestStatus.rejected 

376 host_request.status = HostRequestStatus.rejected 

377 session.flush() 

378 

379 notify( 

380 session, 

381 user_id=host_request.surfer_user_id, 

382 topic_action="host_request:reject", 

383 key=host_request.conversation_id, 

384 data=notification_data_pb2.HostRequestReject( 

385 host_request=host_request_to_pb(host_request, session, context), 

386 host=user_model_to_pb(host_request.host, session, context), 

387 ), 

388 ) 

389 

390 count_host_response(host_request.surfer_user_id, "rejected") 

391 

392 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED: 

393 # only surfer can confirm 

394 if context.user_id != host_request.surfer_user_id: 

395 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

396 # can only confirm an accepted request 

397 if host_request.status != HostRequestStatus.accepted: 

398 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

399 control_message.host_request_status_target = HostRequestStatus.confirmed 

400 host_request.status = HostRequestStatus.confirmed 

401 session.flush() 

402 

403 notify( 

404 session, 

405 user_id=host_request.host_user_id, 

406 topic_action="host_request:confirm", 

407 key=host_request.conversation_id, 

408 data=notification_data_pb2.HostRequestConfirm( 

409 host_request=host_request_to_pb(host_request, session, context), 

410 surfer=user_model_to_pb(host_request.surfer, session, context), 

411 ), 

412 ) 

413 

414 count_host_response(host_request.host_user_id, "confirmed") 

415 

416 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED: 

417 # only surfer can cancel 

418 if context.user_id != host_request.surfer_user_id: 

419 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

420 # can't' cancel an already cancelled or rejected request 

421 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 

422 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

423 control_message.host_request_status_target = HostRequestStatus.cancelled 

424 host_request.status = HostRequestStatus.cancelled 

425 session.flush() 

426 

427 notify( 

428 session, 

429 user_id=host_request.host_user_id, 

430 topic_action="host_request:cancel", 

431 key=host_request.conversation_id, 

432 data=notification_data_pb2.HostRequestCancel( 

433 host_request=host_request_to_pb(host_request, session, context), 

434 surfer=user_model_to_pb(host_request.surfer, session, context), 

435 ), 

436 ) 

437 

438 count_host_response(host_request.host_user_id, "cancelled") 

439 

440 control_message.message_type = MessageType.host_request_status_changed 

441 control_message.conversation_id = host_request.conversation_id 

442 control_message.author_id = context.user_id 

443 session.add(control_message) 

444 

445 if request.text: 

446 latest_message = Message() 

447 latest_message.conversation_id = host_request.conversation_id 

448 latest_message.text = request.text 

449 latest_message.author_id = context.user_id 

450 latest_message.message_type = MessageType.text 

451 session.add(latest_message) 

452 else: 

453 latest_message = control_message 

454 

455 session.flush() 

456 

457 if host_request.surfer_user_id == context.user_id: 

458 host_request.surfer_last_seen_message_id = latest_message.id 

459 else: 

460 host_request.host_last_seen_message_id = latest_message.id 

461 session.commit() 

462 

463 return empty_pb2.Empty() 

464 

465 def GetHostRequestMessages(self, request, context, session): 

466 host_request = session.execute( 

467 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

468 ).scalar_one_or_none() 

469 

470 if not host_request: 

471 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

472 

473 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

474 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

475 

476 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

477 pagination = min(pagination, MAX_PAGE_SIZE) 

478 

479 messages = ( 

480 session.execute( 

481 select(Message) 

482 .where(Message.conversation_id == host_request.conversation_id) 

483 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

484 .order_by(Message.id.desc()) 

485 .limit(pagination + 1) 

486 ) 

487 .scalars() 

488 .all() 

489 ) 

490 

491 no_more = len(messages) <= pagination 

492 

493 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0 

494 

495 return requests_pb2.GetHostRequestMessagesRes( 

496 last_message_id=last_message_id, 

497 no_more=no_more, 

498 messages=[message_to_pb(message) for message in messages[:pagination]], 

499 ) 

500 

501 def SendHostRequestMessage(self, request, context, session): 

502 if request.text == "": 

503 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

504 host_request = session.execute( 

505 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

506 ).scalar_one_or_none() 

507 

508 if not host_request: 

509 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

510 

511 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

512 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

513 

514 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 

515 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.HOST_REQUEST_CLOSED) 

516 

517 if host_request.end_time < now(): 

518 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_IN_PAST) 

519 

520 if host_request.host_user_id == context.user_id: 

521 _possibly_observe_first_response_time(session, host_request, context.user_id, "message") 

522 

523 message = Message() 

524 message.conversation_id = host_request.conversation_id 

525 message.author_id = context.user_id 

526 message.message_type = MessageType.text 

527 message.text = request.text 

528 session.add(message) 

529 session.flush() 

530 

531 if host_request.surfer_user_id == context.user_id: 

532 host_request.surfer_last_seen_message_id = message.id 

533 

534 notify( 

535 session, 

536 user_id=host_request.host_user_id, 

537 topic_action="host_request:message", 

538 key=host_request.conversation_id, 

539 data=notification_data_pb2.HostRequestMessage( 

540 host_request=host_request_to_pb(host_request, session, context), 

541 user=user_model_to_pb(host_request.surfer, session, context), 

542 text=request.text, 

543 am_host=True, 

544 ), 

545 ) 

546 

547 else: 

548 host_request.host_last_seen_message_id = message.id 

549 

550 notify( 

551 session, 

552 user_id=host_request.surfer_user_id, 

553 topic_action="host_request:message", 

554 key=host_request.conversation_id, 

555 data=notification_data_pb2.HostRequestMessage( 

556 host_request=host_request_to_pb(host_request, session, context), 

557 user=user_model_to_pb(host_request.host, session, context), 

558 text=request.text, 

559 am_host=False, 

560 ), 

561 ) 

562 

563 session.commit() 

564 

565 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

566 sent_messages_counter.labels(user_gender, "host request").inc() 

567 

568 return empty_pb2.Empty() 

569 

570 def GetHostRequestUpdates(self, request, context, session): 

571 if request.only_sent and request.only_received: 

572 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_SENT_OR_RECEIVED) 

573 

574 if request.newest_message_id == 0: 

575 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

576 

577 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 

578 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

579 

580 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

581 pagination = min(pagination, MAX_PAGE_SIZE) 

582 

583 statement = ( 

584 select( 

585 Message, 

586 HostRequest.status.label("host_request_status"), 

587 HostRequest.conversation_id.label("host_request_id"), 

588 ) 

589 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

590 .where(Message.id > request.newest_message_id) 

591 ) 

592 

593 if request.only_sent: 

594 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

595 elif request.only_received: 

596 statement = statement.where(HostRequest.host_user_id == context.user_id) 

597 else: 

598 statement = statement.where( 

599 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

600 ) 

601 

602 statement = statement.order_by(Message.id.asc()).limit(pagination + 1) 

603 res = session.execute(statement).all() 

604 

605 no_more = len(res) <= pagination 

606 

607 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO 

608 

609 return requests_pb2.GetHostRequestUpdatesRes( 

610 no_more=no_more, 

611 updates=[ 

612 requests_pb2.HostRequestUpdate( 

613 host_request_id=result.host_request_id, 

614 status=hostrequeststatus2api[result.host_request_status], 

615 message=message_to_pb(result.Message), 

616 ) 

617 for result in res[:pagination] 

618 ], 

619 ) 

620 

621 def MarkLastSeenHostRequest(self, request, context, session): 

622 host_request = session.execute( 

623 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

624 ).scalar_one_or_none() 

625 

626 if not host_request: 

627 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

628 

629 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

630 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

631 

632 if host_request.surfer_user_id == context.user_id: 

633 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id: 

634 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES) 

635 host_request.surfer_last_seen_message_id = request.last_seen_message_id 

636 else: 

637 if not host_request.host_last_seen_message_id <= request.last_seen_message_id: 

638 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES) 

639 host_request.host_last_seen_message_id = request.last_seen_message_id 

640 

641 session.commit() 

642 return empty_pb2.Empty() 

643 

644 def GetResponseRate(self, request, context, session): 

645 # this subquery gets the time that the request was sent 

646 t = ( 

647 select(Message.conversation_id, Message.time) 

648 .where(Message.message_type == MessageType.chat_created) 

649 .subquery() 

650 ) 

651 # this subquery gets the time that the user responded to the request 

652 s = ( 

653 select(Message.conversation_id, func.min(Message.time).label("time")) 

654 .where(Message.author_id == request.user_id) 

655 .group_by(Message.conversation_id) 

656 .subquery() 

657 ) 

658 

659 res = session.execute( 

660 select( 

661 User.id, 

662 # number of requests received 

663 func.count().label("n"), 

664 # percentage of requests responded to 

665 (func.count(s.c.time) / func.cast(func.greatest(func.count(t.c.time), 1.0), Float)).label( 

666 "response_rate" 

667 ), 

668 # the 33rd percentile response time 

669 percentile_disc(0.33) 

670 .within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))) 

671 .label("response_time_p33"), 

672 # the 66th percentile response time 

673 percentile_disc(0.66) 

674 .within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000))) 

675 .label("response_time_p66"), 

676 ) 

677 .where_users_visible(context) 

678 .where(User.id == request.user_id) 

679 .outerjoin(HostRequest, HostRequest.host_user_id == User.id) 

680 .outerjoin(t, t.c.conversation_id == HostRequest.conversation_id) 

681 .outerjoin(s, s.c.conversation_id == HostRequest.conversation_id) 

682 .group_by(User.id) 

683 ).one_or_none() 

684 

685 if not res: 

686 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

687 

688 _, n, response_rate, response_time_p33, response_time_p66 = res 

689 

690 if n < 3: 

691 return requests_pb2.GetResponseRateRes( 

692 insufficient_data=requests_pb2.ResponseRateInsufficientData(), 

693 ) 

694 

695 if response_rate <= 0.33: 

696 return requests_pb2.GetResponseRateRes( 

697 low=requests_pb2.ResponseRateLow(), 

698 ) 

699 

700 response_time_p33_coarsened = Duration_from_timedelta( 

701 timedelta(seconds=round(response_time_p33.total_seconds() / 60) * 60) 

702 ) 

703 

704 if response_rate <= 0.66: 

705 return requests_pb2.GetResponseRateRes( 

706 some=requests_pb2.ResponseRateSome(response_time_p33=response_time_p33_coarsened), 

707 ) 

708 

709 response_time_p66_coarsened = Duration_from_timedelta( 

710 timedelta(seconds=round(response_time_p66.total_seconds() / 60) * 60) 

711 ) 

712 

713 if response_rate <= 0.90: 

714 return requests_pb2.GetResponseRateRes( 

715 most=requests_pb2.ResponseRateMost( 

716 response_time_p33=response_time_p33_coarsened, response_time_p66=response_time_p66_coarsened 

717 ), 

718 ) 

719 else: 

720 return requests_pb2.GetResponseRateRes( 

721 almost_all=requests_pb2.ResponseRateAlmostAll( 

722 response_time_p33=response_time_p33_coarsened, response_time_p66=response_time_p66_coarsened 

723 ), 

724 )