Coverage for src/couchers/servicers/requests.py: 93%

265 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-07-03 04:21 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.orm import aliased 

7from sqlalchemy.sql import and_, func, or_ 

8 

9from couchers import errors 

10from couchers.materialized_views import user_response_rates 

11from couchers.metrics import ( 

12 account_age_on_host_request_create_histogram, 

13 host_request_first_response_histogram, 

14 host_request_responses_counter, 

15 host_requests_sent_counter, 

16 sent_messages_counter, 

17) 

18from couchers.models import Conversation, HostRequest, HostRequestStatus, Message, MessageType, User 

19from couchers.notifications.notify import notify 

20from couchers.servicers.api import response_rate_to_pb, user_model_to_pb 

21from couchers.sql import couchers_select as select 

22from couchers.utils import ( 

23 Timestamp_from_datetime, 

24 date_to_api, 

25 now, 

26 parse_date, 

27 today_in_timezone, 

28) 

29from proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc 

30 

31logger = logging.getLogger(__name__) 

32 

33DEFAULT_PAGINATION_LENGTH = 10 

34MAX_PAGE_SIZE = 50 

35 

36 

37hostrequeststatus2api = { 

38 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING, 

39 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED, 

40 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED, 

41 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED, 

42 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED, 

43} 

44 

45 

46def message_to_pb(message: Message): 

47 """ 

48 Turns the given message to a protocol buffer 

49 """ 

50 if message.is_normal_message: 

51 return conversations_pb2.Message( 

52 message_id=message.id, 

53 author_user_id=message.author_id, 

54 time=Timestamp_from_datetime(message.time), 

55 text=conversations_pb2.MessageContentText(text=message.text), 

56 ) 

57 else: 

58 return conversations_pb2.Message( 

59 message_id=message.id, 

60 author_user_id=message.author_id, 

61 time=Timestamp_from_datetime(message.time), 

62 chat_created=( 

63 conversations_pb2.MessageContentChatCreated() 

64 if message.message_type == MessageType.chat_created 

65 else None 

66 ), 

67 host_request_status_changed=( 

68 conversations_pb2.MessageContentHostRequestStatusChanged( 

69 status=hostrequeststatus2api[message.host_request_status_target] 

70 ) 

71 if message.message_type == MessageType.host_request_status_changed 

72 else None 

73 ), 

74 ) 

75 

76 

77def host_request_to_pb(host_request: HostRequest, session, context): 

78 initial_message = session.execute( 

79 select(Message) 

80 .where(Message.conversation_id == host_request.conversation_id) 

81 .order_by(Message.id.asc()) 

82 .limit(1) 

83 ).scalar_one() 

84 

85 latest_message = session.execute( 

86 select(Message) 

87 .where(Message.conversation_id == host_request.conversation_id) 

88 .order_by(Message.id.desc()) 

89 .limit(1) 

90 ).scalar_one() 

91 

92 return requests_pb2.HostRequest( 

93 host_request_id=host_request.conversation_id, 

94 surfer_user_id=host_request.surfer_user_id, 

95 host_user_id=host_request.host_user_id, 

96 status=hostrequeststatus2api[host_request.status], 

97 created=Timestamp_from_datetime(initial_message.time), 

98 from_date=date_to_api(host_request.from_date), 

99 to_date=date_to_api(host_request.to_date), 

100 last_seen_message_id=( 

101 host_request.surfer_last_seen_message_id 

102 if context.user_id == host_request.surfer_user_id 

103 else host_request.host_last_seen_message_id 

104 ), 

105 latest_message=message_to_pb(latest_message), 

106 ) 

107 

108 

109def _possibly_observe_first_response_time(session, host_request, user_id, response_type): 

110 # if this is the first response then there's nothing by this user yet 

111 assert host_request.host_user_id == user_id 

112 

113 number_messages_by_host = session.execute( 

114 select(func.count()) 

115 .where(Message.conversation_id == host_request.conversation_id) 

116 .where(Message.author_id == user_id) 

117 ).scalar_one_or_none() 

118 

119 if number_messages_by_host == 0: 

120 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one() 

121 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one() 

122 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe( 

123 (now() - host_request.conversation.created).total_seconds() 

124 ) 

125 

126 

127class Requests(requests_pb2_grpc.RequestsServicer): 

128 def CreateHostRequest(self, request, context, session): 

129 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

130 if not user.has_completed_profile: 

131 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_REQUEST) 

132 

133 if request.host_user_id == context.user_id: 

134 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_REQUEST_SELF) 

135 

136 # just to check host exists and is visible 

137 host = session.execute( 

138 select(User).where_users_visible(context).where(User.id == request.host_user_id) 

139 ).scalar_one_or_none() 

140 if not host: 

141 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

142 

143 from_date = parse_date(request.from_date) 

144 to_date = parse_date(request.to_date) 

145 

146 if not from_date or not to_date: 

147 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_DATE) 

148 

149 today = today_in_timezone(host.timezone) 

150 

151 # request starts from the past 

152 if from_date < today: 

153 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_BEFORE_TODAY) 

154 

155 # from_date is not >= to_date 

156 if from_date >= to_date: 

157 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_AFTER_TO) 

158 

159 # No need to check today > to_date 

160 

161 if from_date - today > timedelta(days=365): 

162 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_AFTER_ONE_YEAR) 

163 

164 if to_date - from_date > timedelta(days=365): 

165 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_TO_AFTER_ONE_YEAR) 

166 

167 conversation = Conversation() 

168 session.add(conversation) 

169 session.flush() 

170 

171 session.add( 

172 Message( 

173 conversation_id=conversation.id, 

174 author_id=context.user_id, 

175 message_type=MessageType.chat_created, 

176 ) 

177 ) 

178 

179 message = Message( 

180 conversation_id=conversation.id, 

181 author_id=context.user_id, 

182 text=request.text, 

183 message_type=MessageType.text, 

184 ) 

185 session.add(message) 

186 session.flush() 

187 

188 host_request = HostRequest( 

189 conversation_id=conversation.id, 

190 surfer_user_id=context.user_id, 

191 host_user_id=host.id, 

192 from_date=from_date, 

193 to_date=to_date, 

194 status=HostRequestStatus.pending, 

195 surfer_last_seen_message_id=message.id, 

196 # TODO: tz 

197 # timezone=host.timezone, 

198 ) 

199 session.add(host_request) 

200 session.commit() 

201 

202 notify( 

203 session, 

204 user_id=host_request.host_user_id, 

205 topic_action="host_request:create", 

206 key=host_request.conversation_id, 

207 data=notification_data_pb2.HostRequestCreate( 

208 host_request=host_request_to_pb(host_request, session, context), 

209 surfer=user_model_to_pb(host_request.surfer, session, context), 

210 text=request.text, 

211 ), 

212 ) 

213 

214 host_requests_sent_counter.labels(user.gender, host.gender).inc() 

215 sent_messages_counter.labels(user.gender, "host request send").inc() 

216 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe( 

217 (now() - user.joined).total_seconds() 

218 ) 

219 

220 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id) 

221 

222 def GetHostRequest(self, request, context, session): 

223 host_request = session.execute( 

224 select(HostRequest) 

225 .where_users_column_visible(context, HostRequest.surfer_user_id) 

226 .where_users_column_visible(context, HostRequest.host_user_id) 

227 .where(HostRequest.conversation_id == request.host_request_id) 

228 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

229 ).scalar_one_or_none() 

230 

231 if not host_request: 

232 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

233 

234 return host_request_to_pb(host_request, session, context) 

235 

236 def ListHostRequests(self, request, context, session): 

237 if request.only_sent and request.only_received: 

238 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_SENT_OR_RECEIVED) 

239 

240 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

241 pagination = min(pagination, MAX_PAGE_SIZE) 

242 

243 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have 

244 # none as message_2.id. So just filter for these ones to get highest messages only. 

245 # See https://stackoverflow.com/a/27802817/6115336 

246 message_2 = aliased(Message) 

247 statement = ( 

248 select(Message, HostRequest, Conversation) 

249 .outerjoin(message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id)) 

250 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

251 .join(Conversation, Conversation.id == HostRequest.conversation_id) 

252 .where_users_column_visible(context, HostRequest.surfer_user_id) 

253 .where_users_column_visible(context, HostRequest.host_user_id) 

254 .where(message_2.id == None) 

255 .where(or_(Message.id < request.last_request_id, request.last_request_id == 0)) 

256 ) 

257 

258 if request.only_sent: 

259 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

260 elif request.only_received: 

261 statement = statement.where(HostRequest.host_user_id == context.user_id) 

262 elif request.HasField("only_archived"): 

263 statement = statement.where( 

264 or_( 

265 and_( 

266 HostRequest.surfer_user_id == context.user_id, 

267 HostRequest.is_surfer_archived == request.only_archived, 

268 ), 

269 and_( 

270 HostRequest.host_user_id == context.user_id, 

271 HostRequest.is_host_archived == request.only_archived, 

272 ), 

273 ) 

274 ) 

275 else: 

276 statement = statement.where( 

277 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

278 ) 

279 

280 # TODO: I considered having the latest control message be the single source of truth for 

281 # the HostRequest.status, but decided against it because of this filter. 

282 # Another possibility is to filter in the python instead of SQL, but that's slower 

283 if request.only_active: 

284 statement = statement.where( 

285 or_( 

286 HostRequest.status == HostRequestStatus.pending, 

287 HostRequest.status == HostRequestStatus.accepted, 

288 HostRequest.status == HostRequestStatus.confirmed, 

289 ) 

290 ) 

291 statement = statement.where(HostRequest.end_time <= func.now()) 

292 

293 statement = statement.order_by(Message.id.desc()).limit(pagination + 1) 

294 results = session.execute(statement).all() 

295 

296 host_requests = [ 

297 requests_pb2.HostRequest( 

298 host_request_id=result.HostRequest.conversation_id, 

299 surfer_user_id=result.HostRequest.surfer_user_id, 

300 host_user_id=result.HostRequest.host_user_id, 

301 status=hostrequeststatus2api[result.HostRequest.status], 

302 created=Timestamp_from_datetime(result.Conversation.created), 

303 from_date=date_to_api(result.HostRequest.from_date), 

304 to_date=date_to_api(result.HostRequest.to_date), 

305 last_seen_message_id=( 

306 result.HostRequest.surfer_last_seen_message_id 

307 if context.user_id == result.HostRequest.surfer_user_id 

308 else result.HostRequest.host_last_seen_message_id 

309 ), 

310 latest_message=message_to_pb(result.Message), 

311 ) 

312 for result in results[:pagination] 

313 ] 

314 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO 

315 no_more = len(results) <= pagination 

316 

317 return requests_pb2.ListHostRequestsRes( 

318 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests 

319 ) 

320 

321 def RespondHostRequest(self, request, context, session): 

322 def count_host_response(other_user_id, response_type): 

323 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

324 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one() 

325 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc() 

326 sent_messages_counter.labels(user_gender, "host request response").inc() 

327 

328 host_request = session.execute( 

329 select(HostRequest) 

330 .where_users_column_visible(context, HostRequest.surfer_user_id) 

331 .where_users_column_visible(context, HostRequest.host_user_id) 

332 .where(HostRequest.conversation_id == request.host_request_id) 

333 ).scalar_one_or_none() 

334 

335 if not host_request: 

336 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

337 

338 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

339 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

340 

341 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING: 

342 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

343 

344 if host_request.end_time < now(): 

345 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_IN_PAST) 

346 

347 control_message = Message() 

348 

349 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED: 

350 # only host can accept 

351 if context.user_id != host_request.host_user_id: 

352 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.NOT_THE_HOST) 

353 # can't accept a cancelled or confirmed request (only reject), or already accepted 

354 if ( 

355 host_request.status == HostRequestStatus.cancelled 

356 or host_request.status == HostRequestStatus.confirmed 

357 or host_request.status == HostRequestStatus.accepted 

358 ): 

359 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

360 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted") 

361 control_message.host_request_status_target = HostRequestStatus.accepted 

362 host_request.status = HostRequestStatus.accepted 

363 session.flush() 

364 

365 notify( 

366 session, 

367 user_id=host_request.surfer_user_id, 

368 topic_action="host_request:accept", 

369 key=host_request.conversation_id, 

370 data=notification_data_pb2.HostRequestAccept( 

371 host_request=host_request_to_pb(host_request, session, context), 

372 host=user_model_to_pb(host_request.host, session, context), 

373 ), 

374 ) 

375 

376 count_host_response(host_request.surfer_user_id, "accepted") 

377 

378 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED: 

379 # only host can reject 

380 if context.user_id != host_request.host_user_id: 

381 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

382 # can't reject a cancelled or already rejected request 

383 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 

384 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

385 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected") 

386 control_message.host_request_status_target = HostRequestStatus.rejected 

387 host_request.status = HostRequestStatus.rejected 

388 session.flush() 

389 

390 notify( 

391 session, 

392 user_id=host_request.surfer_user_id, 

393 topic_action="host_request:reject", 

394 key=host_request.conversation_id, 

395 data=notification_data_pb2.HostRequestReject( 

396 host_request=host_request_to_pb(host_request, session, context), 

397 host=user_model_to_pb(host_request.host, session, context), 

398 ), 

399 ) 

400 

401 count_host_response(host_request.surfer_user_id, "rejected") 

402 

403 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED: 

404 # only surfer can confirm 

405 if context.user_id != host_request.surfer_user_id: 

406 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

407 # can only confirm an accepted request 

408 if host_request.status != HostRequestStatus.accepted: 

409 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

410 control_message.host_request_status_target = HostRequestStatus.confirmed 

411 host_request.status = HostRequestStatus.confirmed 

412 session.flush() 

413 

414 notify( 

415 session, 

416 user_id=host_request.host_user_id, 

417 topic_action="host_request:confirm", 

418 key=host_request.conversation_id, 

419 data=notification_data_pb2.HostRequestConfirm( 

420 host_request=host_request_to_pb(host_request, session, context), 

421 surfer=user_model_to_pb(host_request.surfer, session, context), 

422 ), 

423 ) 

424 

425 count_host_response(host_request.host_user_id, "confirmed") 

426 

427 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED: 

428 # only surfer can cancel 

429 if context.user_id != host_request.surfer_user_id: 

430 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

431 # can't' cancel an already cancelled or rejected request 

432 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 

433 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS) 

434 control_message.host_request_status_target = HostRequestStatus.cancelled 

435 host_request.status = HostRequestStatus.cancelled 

436 session.flush() 

437 

438 notify( 

439 session, 

440 user_id=host_request.host_user_id, 

441 topic_action="host_request:cancel", 

442 key=host_request.conversation_id, 

443 data=notification_data_pb2.HostRequestCancel( 

444 host_request=host_request_to_pb(host_request, session, context), 

445 surfer=user_model_to_pb(host_request.surfer, session, context), 

446 ), 

447 ) 

448 

449 count_host_response(host_request.host_user_id, "cancelled") 

450 

451 control_message.message_type = MessageType.host_request_status_changed 

452 control_message.conversation_id = host_request.conversation_id 

453 control_message.author_id = context.user_id 

454 session.add(control_message) 

455 

456 if request.text: 

457 latest_message = Message() 

458 latest_message.conversation_id = host_request.conversation_id 

459 latest_message.text = request.text 

460 latest_message.author_id = context.user_id 

461 latest_message.message_type = MessageType.text 

462 session.add(latest_message) 

463 else: 

464 latest_message = control_message 

465 

466 session.flush() 

467 

468 if host_request.surfer_user_id == context.user_id: 

469 host_request.surfer_last_seen_message_id = latest_message.id 

470 else: 

471 host_request.host_last_seen_message_id = latest_message.id 

472 session.commit() 

473 

474 return empty_pb2.Empty() 

475 

476 def GetHostRequestMessages(self, request, context, session): 

477 host_request = session.execute( 

478 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

479 ).scalar_one_or_none() 

480 

481 if not host_request: 

482 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

483 

484 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

485 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

486 

487 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

488 pagination = min(pagination, MAX_PAGE_SIZE) 

489 

490 messages = ( 

491 session.execute( 

492 select(Message) 

493 .where(Message.conversation_id == host_request.conversation_id) 

494 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0)) 

495 .order_by(Message.id.desc()) 

496 .limit(pagination + 1) 

497 ) 

498 .scalars() 

499 .all() 

500 ) 

501 

502 no_more = len(messages) <= pagination 

503 

504 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0 

505 

506 return requests_pb2.GetHostRequestMessagesRes( 

507 last_message_id=last_message_id, 

508 no_more=no_more, 

509 messages=[message_to_pb(message) for message in messages[:pagination]], 

510 ) 

511 

512 def SendHostRequestMessage(self, request, context, session): 

513 if request.text == "": 

514 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

515 host_request = session.execute( 

516 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

517 ).scalar_one_or_none() 

518 

519 if not host_request: 

520 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

521 

522 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

523 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

524 

525 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 

526 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.HOST_REQUEST_CLOSED) 

527 

528 if host_request.host_user_id == context.user_id: 

529 _possibly_observe_first_response_time(session, host_request, context.user_id, "message") 

530 

531 message = Message() 

532 message.conversation_id = host_request.conversation_id 

533 message.author_id = context.user_id 

534 message.message_type = MessageType.text 

535 message.text = request.text 

536 session.add(message) 

537 session.flush() 

538 

539 if host_request.surfer_user_id == context.user_id: 

540 host_request.surfer_last_seen_message_id = message.id 

541 

542 notify( 

543 session, 

544 user_id=host_request.host_user_id, 

545 topic_action="host_request:message", 

546 key=host_request.conversation_id, 

547 data=notification_data_pb2.HostRequestMessage( 

548 host_request=host_request_to_pb(host_request, session, context), 

549 user=user_model_to_pb(host_request.surfer, session, context), 

550 text=request.text, 

551 am_host=True, 

552 ), 

553 ) 

554 

555 else: 

556 host_request.host_last_seen_message_id = message.id 

557 

558 notify( 

559 session, 

560 user_id=host_request.surfer_user_id, 

561 topic_action="host_request:message", 

562 key=host_request.conversation_id, 

563 data=notification_data_pb2.HostRequestMessage( 

564 host_request=host_request_to_pb(host_request, session, context), 

565 user=user_model_to_pb(host_request.host, session, context), 

566 text=request.text, 

567 am_host=False, 

568 ), 

569 ) 

570 

571 session.commit() 

572 

573 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one() 

574 sent_messages_counter.labels(user_gender, "host request").inc() 

575 

576 return empty_pb2.Empty() 

577 

578 def GetHostRequestUpdates(self, request, context, session): 

579 if request.only_sent and request.only_received: 

580 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_SENT_OR_RECEIVED) 

581 

582 if request.newest_message_id == 0: 

583 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

584 

585 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 

586 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE) 

587 

588 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH 

589 pagination = min(pagination, MAX_PAGE_SIZE) 

590 

591 statement = ( 

592 select( 

593 Message, 

594 HostRequest.status.label("host_request_status"), 

595 HostRequest.conversation_id.label("host_request_id"), 

596 ) 

597 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id) 

598 .where(Message.id > request.newest_message_id) 

599 ) 

600 

601 if request.only_sent: 

602 statement = statement.where(HostRequest.surfer_user_id == context.user_id) 

603 elif request.only_received: 

604 statement = statement.where(HostRequest.host_user_id == context.user_id) 

605 else: 

606 statement = statement.where( 

607 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id) 

608 ) 

609 

610 statement = statement.order_by(Message.id.asc()).limit(pagination + 1) 

611 res = session.execute(statement).all() 

612 

613 no_more = len(res) <= pagination 

614 

615 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO 

616 

617 return requests_pb2.GetHostRequestUpdatesRes( 

618 no_more=no_more, 

619 updates=[ 

620 requests_pb2.HostRequestUpdate( 

621 host_request_id=result.host_request_id, 

622 status=hostrequeststatus2api[result.host_request_status], 

623 message=message_to_pb(result.Message), 

624 ) 

625 for result in res[:pagination] 

626 ], 

627 ) 

628 

629 def MarkLastSeenHostRequest(self, request, context, session): 

630 host_request = session.execute( 

631 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id) 

632 ).scalar_one_or_none() 

633 

634 if not host_request: 

635 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

636 

637 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 

638 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

639 

640 if host_request.surfer_user_id == context.user_id: 

641 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id: 

642 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES) 

643 host_request.surfer_last_seen_message_id = request.last_seen_message_id 

644 else: 

645 if not host_request.host_last_seen_message_id <= request.last_seen_message_id: 

646 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES) 

647 host_request.host_last_seen_message_id = request.last_seen_message_id 

648 

649 session.commit() 

650 return empty_pb2.Empty() 

651 

652 def SetHostRequestArchiveStatus(self, request, context, session): 

653 host_request: HostRequest = session.execute( 

654 select(HostRequest) 

655 .where(HostRequest.conversation_id == request.host_request_id) 

656 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id)) 

657 ).scalar_one_or_none() 

658 

659 if not host_request: 

660 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND) 

661 

662 if context.user_id == host_request.surfer_user_id: 

663 host_request.is_surfer_archived = request.is_archived 

664 else: 

665 host_request.is_host_archived = request.is_archived 

666 

667 return requests_pb2.SetHostRequestArchiveStatusRes( 

668 host_request_id=host_request.conversation_id, 

669 is_archived=request.is_archived, 

670 ) 

671 

672 def GetResponseRate(self, request, context, session): 

673 user_res = session.execute( 

674 select(User.id, user_response_rates) 

675 .outerjoin(user_response_rates, user_response_rates.c.user_id == User.id) 

676 .where_users_visible(context) 

677 .where(User.id == request.user_id) 

678 ).one_or_none() 

679 

680 # if user doesn't exist, return None 

681 if not user_res: 

682 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

683 

684 user, *response_rates = user_res 

685 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates))