Coverage for src/couchers/servicers/requests.py: 93%
284 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-21 02:54 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-21 02:54 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy import exists
7from sqlalchemy.orm import aliased
8from sqlalchemy.sql import and_, func, or_
10from couchers.materialized_views import UserResponseRate
11from couchers.metrics import (
12 account_age_on_host_request_create_histogram,
13 host_request_first_response_histogram,
14 host_request_responses_counter,
15 host_requests_sent_counter,
16 sent_messages_counter,
17)
18from couchers.models import (
19 Conversation,
20 HostRequest,
21 HostRequestFeedback,
22 HostRequestQuality,
23 HostRequestStatus,
24 Message,
25 MessageType,
26 RateLimitAction,
27 User,
28)
29from couchers.notifications.notify import notify
30from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc
31from couchers.rate_limits.check import process_rate_limits_and_check_abort
32from couchers.rate_limits.definitions import RATE_LIMIT_HOURS
33from couchers.servicers.api import response_rate_to_pb, user_model_to_pb
34from couchers.sql import couchers_select as select
35from couchers.utils import (
36 Timestamp_from_datetime,
37 date_to_api,
38 get_coordinates,
39 now,
40 parse_date,
41 today_in_timezone,
42)
44logger = logging.getLogger(__name__)
46DEFAULT_PAGINATION_LENGTH = 10
47MAX_PAGE_SIZE = 50
50hostrequeststatus2api = {
51 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING,
52 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED,
53 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED,
54 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED,
55 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED,
56}
58hostrequestquality2sql = {
59 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality,
60 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality,
61 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality,
62}
65def message_to_pb(message: Message):
66 """
67 Turns the given message to a protocol buffer
68 """
69 if message.is_normal_message:
70 return conversations_pb2.Message(
71 message_id=message.id,
72 author_user_id=message.author_id,
73 time=Timestamp_from_datetime(message.time),
74 text=conversations_pb2.MessageContentText(text=message.text),
75 )
76 else:
77 return conversations_pb2.Message(
78 message_id=message.id,
79 author_user_id=message.author_id,
80 time=Timestamp_from_datetime(message.time),
81 chat_created=(
82 conversations_pb2.MessageContentChatCreated()
83 if message.message_type == MessageType.chat_created
84 else None
85 ),
86 host_request_status_changed=(
87 conversations_pb2.MessageContentHostRequestStatusChanged(
88 status=hostrequeststatus2api[message.host_request_status_target]
89 )
90 if message.message_type == MessageType.host_request_status_changed
91 else None
92 ),
93 )
96def host_request_to_pb(host_request: HostRequest, session, context):
97 initial_message = session.execute(
98 select(Message)
99 .where(Message.conversation_id == host_request.conversation_id)
100 .order_by(Message.id.asc())
101 .limit(1)
102 ).scalar_one()
104 latest_message = session.execute(
105 select(Message)
106 .where(Message.conversation_id == host_request.conversation_id)
107 .order_by(Message.id.desc())
108 .limit(1)
109 ).scalar_one()
111 lat, lng = get_coordinates(host_request.hosting_location)
113 need_feedback = False
114 if context.user_id == host_request.host_user_id and host_request.status == HostRequestStatus.rejected:
115 need_feedback = not session.execute(
116 select(
117 exists().where(
118 HostRequestFeedback.from_user_id == context.user_id,
119 HostRequestFeedback.host_request_id == host_request.conversation_id,
120 )
121 )
122 ).scalar_one()
124 return requests_pb2.HostRequest(
125 host_request_id=host_request.conversation_id,
126 surfer_user_id=host_request.surfer_user_id,
127 host_user_id=host_request.host_user_id,
128 status=hostrequeststatus2api[host_request.status],
129 created=Timestamp_from_datetime(initial_message.time),
130 from_date=date_to_api(host_request.from_date),
131 to_date=date_to_api(host_request.to_date),
132 last_seen_message_id=(
133 host_request.surfer_last_seen_message_id
134 if context.user_id == host_request.surfer_user_id
135 else host_request.host_last_seen_message_id
136 ),
137 latest_message=message_to_pb(latest_message),
138 hosting_city=host_request.hosting_city,
139 hosting_lat=lat,
140 hosting_lng=lng,
141 hosting_radius=host_request.hosting_radius,
142 need_host_request_feedback=need_feedback,
143 )
146def _possibly_observe_first_response_time(session, host_request, user_id, response_type):
147 # if this is the first response then there's nothing by this user yet
148 assert host_request.host_user_id == user_id
150 number_messages_by_host = session.execute(
151 select(func.count())
152 .where(Message.conversation_id == host_request.conversation_id)
153 .where(Message.author_id == user_id)
154 ).scalar_one_or_none()
156 if number_messages_by_host == 0:
157 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one()
158 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one()
159 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe(
160 (now() - host_request.conversation.created).total_seconds()
161 )
164class Requests(requests_pb2_grpc.RequestsServicer):
165 def CreateHostRequest(self, request, context, session):
166 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
167 if not user.has_completed_profile:
168 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request")
170 if request.host_user_id == context.user_id:
171 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self")
173 # just to check host exists and is visible
174 host = session.execute(
175 select(User).where_users_visible(context).where(User.id == request.host_user_id)
176 ).scalar_one_or_none()
177 if not host:
178 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
180 from_date = parse_date(request.from_date)
181 to_date = parse_date(request.to_date)
183 if not from_date or not to_date:
184 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date")
186 today = today_in_timezone(host.timezone)
188 # request starts from the past
189 if from_date < today:
190 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today")
192 # from_date is not >= to_date
193 if from_date >= to_date:
194 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to")
196 # No need to check today > to_date
198 if from_date - today > timedelta(days=365):
199 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year")
201 if to_date - from_date > timedelta(days=365):
202 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year")
204 # Check if user has been sending host requests excessively
205 if process_rate_limits_and_check_abort(
206 session=session, user_id=context.user_id, action=RateLimitAction.host_request
207 ):
208 context.abort_with_error_code(
209 grpc.StatusCode.RESOURCE_EXHAUSTED,
210 "host_request_rate_limit",
211 substitutions={"hours": RATE_LIMIT_HOURS},
212 )
214 conversation = Conversation()
215 session.add(conversation)
216 session.flush()
218 session.add(
219 Message(
220 conversation_id=conversation.id,
221 author_id=context.user_id,
222 message_type=MessageType.chat_created,
223 )
224 )
226 message = Message(
227 conversation_id=conversation.id,
228 author_id=context.user_id,
229 text=request.text,
230 message_type=MessageType.text,
231 )
232 session.add(message)
233 session.flush()
235 host_request = HostRequest(
236 conversation_id=conversation.id,
237 surfer_user_id=context.user_id,
238 host_user_id=host.id,
239 from_date=from_date,
240 to_date=to_date,
241 status=HostRequestStatus.pending,
242 surfer_last_seen_message_id=message.id,
243 # TODO: tz
244 # timezone=host.timezone,
245 hosting_city=host.city,
246 hosting_location=host.geom,
247 hosting_radius=host.geom_radius,
248 )
249 session.add(host_request)
250 session.commit()
252 notify(
253 session,
254 user_id=host_request.host_user_id,
255 topic_action="host_request:create",
256 key=host_request.conversation_id,
257 data=notification_data_pb2.HostRequestCreate(
258 host_request=host_request_to_pb(host_request, session, context),
259 surfer=user_model_to_pb(host_request.surfer, session, context),
260 text=request.text,
261 ),
262 )
264 host_requests_sent_counter.labels(user.gender, host.gender).inc()
265 sent_messages_counter.labels(user.gender, "host request send").inc()
266 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe(
267 (now() - user.joined).total_seconds()
268 )
270 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id)
272 def GetHostRequest(self, request, context, session):
273 host_request = session.execute(
274 select(HostRequest)
275 .where_users_column_visible(context, HostRequest.surfer_user_id)
276 .where_users_column_visible(context, HostRequest.host_user_id)
277 .where(HostRequest.conversation_id == request.host_request_id)
278 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id))
279 ).scalar_one_or_none()
281 if not host_request:
282 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
284 return host_request_to_pb(host_request, session, context)
286 def ListHostRequests(self, request, context, session):
287 if request.only_sent and request.only_received:
288 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
290 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
291 pagination = min(pagination, MAX_PAGE_SIZE)
293 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have
294 # none as message_2.id. So just filter for these ones to get highest messages only.
295 # See https://stackoverflow.com/a/27802817/6115336
296 message_2 = aliased(Message)
297 statement = (
298 select(Message, HostRequest, Conversation)
299 .outerjoin(message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id))
300 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
301 .join(Conversation, Conversation.id == HostRequest.conversation_id)
302 .where_users_column_visible(context, HostRequest.surfer_user_id)
303 .where_users_column_visible(context, HostRequest.host_user_id)
304 .where(message_2.id == None)
305 .where(or_(Message.id < request.last_request_id, request.last_request_id == 0))
306 )
308 if request.only_sent:
309 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
310 elif request.only_received:
311 statement = statement.where(HostRequest.host_user_id == context.user_id)
312 elif request.HasField("only_archived"):
313 statement = statement.where(
314 or_(
315 and_(
316 HostRequest.surfer_user_id == context.user_id,
317 HostRequest.is_surfer_archived == request.only_archived,
318 ),
319 and_(
320 HostRequest.host_user_id == context.user_id,
321 HostRequest.is_host_archived == request.only_archived,
322 ),
323 )
324 )
325 else:
326 statement = statement.where(
327 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
328 )
330 # TODO: I considered having the latest control message be the single source of truth for
331 # the HostRequest.status, but decided against it because of this filter.
332 # Another possibility is to filter in the python instead of SQL, but that's slower
333 if request.only_active:
334 statement = statement.where(
335 or_(
336 HostRequest.status == HostRequestStatus.pending,
337 HostRequest.status == HostRequestStatus.accepted,
338 HostRequest.status == HostRequestStatus.confirmed,
339 )
340 )
341 statement = statement.where(HostRequest.end_time <= func.now())
343 statement = statement.order_by(Message.id.desc()).limit(pagination + 1)
344 results = session.execute(statement).all()
346 host_requests = []
347 for result in results[:pagination]:
348 lat, lng = get_coordinates(result.HostRequest.hosting_location)
349 host_requests.append(
350 requests_pb2.HostRequest(
351 host_request_id=result.HostRequest.conversation_id,
352 surfer_user_id=result.HostRequest.surfer_user_id,
353 host_user_id=result.HostRequest.host_user_id,
354 status=hostrequeststatus2api[result.HostRequest.status],
355 created=Timestamp_from_datetime(result.Conversation.created),
356 from_date=date_to_api(result.HostRequest.from_date),
357 to_date=date_to_api(result.HostRequest.to_date),
358 last_seen_message_id=(
359 result.HostRequest.surfer_last_seen_message_id
360 if context.user_id == result.HostRequest.surfer_user_id
361 else result.HostRequest.host_last_seen_message_id
362 ),
363 latest_message=message_to_pb(result.Message),
364 hosting_city=result.HostRequest.hosting_city,
365 hosting_lat=lat,
366 hosting_lng=lng,
367 hosting_radius=result.HostRequest.hosting_radius,
368 )
369 )
371 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO
372 no_more = len(results) <= pagination
374 return requests_pb2.ListHostRequestsRes(
375 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests
376 )
378 def RespondHostRequest(self, request, context, session):
379 def count_host_response(other_user_id, response_type):
380 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
381 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one()
382 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc()
383 sent_messages_counter.labels(user_gender, "host request response").inc()
385 host_request = session.execute(
386 select(HostRequest)
387 .where_users_column_visible(context, HostRequest.surfer_user_id)
388 .where_users_column_visible(context, HostRequest.host_user_id)
389 .where(HostRequest.conversation_id == request.host_request_id)
390 ).scalar_one_or_none()
392 if not host_request:
393 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
395 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
396 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
398 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING:
399 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
401 if host_request.end_time < now():
402 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past")
404 control_message = Message()
406 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED:
407 # only host can accept
408 if context.user_id != host_request.host_user_id:
409 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host")
410 # can't accept a cancelled or confirmed request (only reject), or already accepted
411 if (
412 host_request.status == HostRequestStatus.cancelled
413 or host_request.status == HostRequestStatus.confirmed
414 or host_request.status == HostRequestStatus.accepted
415 ):
416 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
417 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted")
418 control_message.host_request_status_target = HostRequestStatus.accepted
419 host_request.status = HostRequestStatus.accepted
420 session.flush()
422 notify(
423 session,
424 user_id=host_request.surfer_user_id,
425 topic_action="host_request:accept",
426 key=host_request.conversation_id,
427 data=notification_data_pb2.HostRequestAccept(
428 host_request=host_request_to_pb(host_request, session, context),
429 host=user_model_to_pb(host_request.host, session, context),
430 ),
431 )
433 count_host_response(host_request.surfer_user_id, "accepted")
435 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED:
436 # only host can reject
437 if context.user_id != host_request.host_user_id:
438 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
439 # can't reject a cancelled or already rejected request
440 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected:
441 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
442 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected")
443 control_message.host_request_status_target = HostRequestStatus.rejected
444 host_request.status = HostRequestStatus.rejected
445 session.flush()
447 notify(
448 session,
449 user_id=host_request.surfer_user_id,
450 topic_action="host_request:reject",
451 key=host_request.conversation_id,
452 data=notification_data_pb2.HostRequestReject(
453 host_request=host_request_to_pb(host_request, session, context),
454 host=user_model_to_pb(host_request.host, session, context),
455 ),
456 )
458 count_host_response(host_request.surfer_user_id, "rejected")
460 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED:
461 # only surfer can confirm
462 if context.user_id != host_request.surfer_user_id:
463 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
464 # can only confirm an accepted request
465 if host_request.status != HostRequestStatus.accepted:
466 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
467 control_message.host_request_status_target = HostRequestStatus.confirmed
468 host_request.status = HostRequestStatus.confirmed
469 session.flush()
471 notify(
472 session,
473 user_id=host_request.host_user_id,
474 topic_action="host_request:confirm",
475 key=host_request.conversation_id,
476 data=notification_data_pb2.HostRequestConfirm(
477 host_request=host_request_to_pb(host_request, session, context),
478 surfer=user_model_to_pb(host_request.surfer, session, context),
479 ),
480 )
482 count_host_response(host_request.host_user_id, "confirmed")
484 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED:
485 # only surfer can cancel
486 if context.user_id != host_request.surfer_user_id:
487 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
488 # can't' cancel an already cancelled or rejected request
489 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled:
490 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
491 control_message.host_request_status_target = HostRequestStatus.cancelled
492 host_request.status = HostRequestStatus.cancelled
493 session.flush()
495 notify(
496 session,
497 user_id=host_request.host_user_id,
498 topic_action="host_request:cancel",
499 key=host_request.conversation_id,
500 data=notification_data_pb2.HostRequestCancel(
501 host_request=host_request_to_pb(host_request, session, context),
502 surfer=user_model_to_pb(host_request.surfer, session, context),
503 ),
504 )
506 count_host_response(host_request.host_user_id, "cancelled")
508 control_message.message_type = MessageType.host_request_status_changed
509 control_message.conversation_id = host_request.conversation_id
510 control_message.author_id = context.user_id
511 session.add(control_message)
513 if request.text:
514 latest_message = Message()
515 latest_message.conversation_id = host_request.conversation_id
516 latest_message.text = request.text
517 latest_message.author_id = context.user_id
518 latest_message.message_type = MessageType.text
519 session.add(latest_message)
520 else:
521 latest_message = control_message
523 session.flush()
525 if host_request.surfer_user_id == context.user_id:
526 host_request.surfer_last_seen_message_id = latest_message.id
527 else:
528 host_request.host_last_seen_message_id = latest_message.id
529 session.commit()
531 return empty_pb2.Empty()
533 def GetHostRequestMessages(self, request, context, session):
534 host_request = session.execute(
535 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id)
536 ).scalar_one_or_none()
538 if not host_request:
539 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
541 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
542 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
544 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
545 pagination = min(pagination, MAX_PAGE_SIZE)
547 messages = (
548 session.execute(
549 select(Message)
550 .where(Message.conversation_id == host_request.conversation_id)
551 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0))
552 .order_by(Message.id.desc())
553 .limit(pagination + 1)
554 )
555 .scalars()
556 .all()
557 )
559 no_more = len(messages) <= pagination
561 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0
563 return requests_pb2.GetHostRequestMessagesRes(
564 last_message_id=last_message_id,
565 no_more=no_more,
566 messages=[message_to_pb(message) for message in messages[:pagination]],
567 )
569 def SendHostRequestMessage(self, request, context, session):
570 if request.text == "":
571 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
572 host_request = session.execute(
573 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id)
574 ).scalar_one_or_none()
576 if not host_request:
577 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
579 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
580 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
582 if host_request.host_user_id == context.user_id:
583 _possibly_observe_first_response_time(session, host_request, context.user_id, "message")
585 message = Message()
586 message.conversation_id = host_request.conversation_id
587 message.author_id = context.user_id
588 message.message_type = MessageType.text
589 message.text = request.text
590 session.add(message)
591 session.flush()
593 if host_request.surfer_user_id == context.user_id:
594 host_request.surfer_last_seen_message_id = message.id
596 notify(
597 session,
598 user_id=host_request.host_user_id,
599 topic_action="host_request:message",
600 key=host_request.conversation_id,
601 data=notification_data_pb2.HostRequestMessage(
602 host_request=host_request_to_pb(host_request, session, context),
603 user=user_model_to_pb(host_request.surfer, session, context),
604 text=request.text,
605 am_host=True,
606 ),
607 )
609 else:
610 host_request.host_last_seen_message_id = message.id
612 notify(
613 session,
614 user_id=host_request.surfer_user_id,
615 topic_action="host_request:message",
616 key=host_request.conversation_id,
617 data=notification_data_pb2.HostRequestMessage(
618 host_request=host_request_to_pb(host_request, session, context),
619 user=user_model_to_pb(host_request.host, session, context),
620 text=request.text,
621 am_host=False,
622 ),
623 )
625 session.commit()
627 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
628 sent_messages_counter.labels(user_gender, "host request").inc()
630 return empty_pb2.Empty()
632 def GetHostRequestUpdates(self, request, context, session):
633 if request.only_sent and request.only_received:
634 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
636 if request.newest_message_id == 0:
637 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
639 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none():
640 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
642 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
643 pagination = min(pagination, MAX_PAGE_SIZE)
645 statement = (
646 select(
647 Message,
648 HostRequest.status.label("host_request_status"),
649 HostRequest.conversation_id.label("host_request_id"),
650 )
651 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
652 .where(Message.id > request.newest_message_id)
653 )
655 if request.only_sent:
656 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
657 elif request.only_received:
658 statement = statement.where(HostRequest.host_user_id == context.user_id)
659 else:
660 statement = statement.where(
661 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
662 )
664 statement = statement.order_by(Message.id.asc()).limit(pagination + 1)
665 res = session.execute(statement).all()
667 no_more = len(res) <= pagination
669 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO
671 return requests_pb2.GetHostRequestUpdatesRes(
672 no_more=no_more,
673 updates=[
674 requests_pb2.HostRequestUpdate(
675 host_request_id=result.host_request_id,
676 status=hostrequeststatus2api[result.host_request_status],
677 message=message_to_pb(result.Message),
678 )
679 for result in res[:pagination]
680 ],
681 )
683 def MarkLastSeenHostRequest(self, request, context, session):
684 host_request = session.execute(
685 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id)
686 ).scalar_one_or_none()
688 if not host_request:
689 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
691 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
692 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
694 if host_request.surfer_user_id == context.user_id:
695 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id:
696 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
697 host_request.surfer_last_seen_message_id = request.last_seen_message_id
698 else:
699 if not host_request.host_last_seen_message_id <= request.last_seen_message_id:
700 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
701 host_request.host_last_seen_message_id = request.last_seen_message_id
703 session.commit()
704 return empty_pb2.Empty()
706 def SetHostRequestArchiveStatus(self, request, context, session):
707 host_request: HostRequest = session.execute(
708 select(HostRequest)
709 .where(HostRequest.conversation_id == request.host_request_id)
710 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id))
711 ).scalar_one_or_none()
713 if not host_request:
714 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
716 if context.user_id == host_request.surfer_user_id:
717 host_request.is_surfer_archived = request.is_archived
718 else:
719 host_request.is_host_archived = request.is_archived
721 return requests_pb2.SetHostRequestArchiveStatusRes(
722 host_request_id=host_request.conversation_id,
723 is_archived=request.is_archived,
724 )
726 def GetResponseRate(self, request, context, session):
727 user_res = session.execute(
728 select(User.id, UserResponseRate)
729 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id)
730 .where_users_visible(context)
731 .where(User.id == request.user_id)
732 ).one_or_none()
734 # if user doesn't exist, return None
735 if not user_res:
736 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
738 user, response_rates = user_res
739 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates))
741 def SendHostRequestFeedback(self, request, context, session):
742 host_request = session.execute(
743 select(HostRequest)
744 .where(HostRequest.conversation_id == request.host_request_id)
745 .where(HostRequest.host_user_id == context.user_id)
746 ).scalar_one_or_none()
748 if not host_request:
749 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
751 feedback = session.execute(
752 select(HostRequestFeedback)
753 .where(HostRequestFeedback.host_request_id == host_request.conversation_id)
754 .where(HostRequestFeedback.from_user_id == context.user_id)
755 ).scalar_one_or_none()
757 if feedback:
758 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback")
760 session.add(
761 HostRequestFeedback(
762 host_request_id=host_request.conversation_id,
763 from_user_id=host_request.host_user_id,
764 to_user_id=host_request.surfer_user_id,
765 request_quality=hostrequestquality2sql.get(request.host_request_quality),
766 decline_reason=request.decline_reason,
767 )
768 )
770 return empty_pb2.Empty()