Coverage for src/couchers/servicers/requests.py: 93%
284 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy import exists
7from sqlalchemy.orm import aliased
8from sqlalchemy.sql import and_, func, or_
10from couchers import errors
11from couchers.materialized_views import UserResponseRate
12from couchers.metrics import (
13 account_age_on_host_request_create_histogram,
14 host_request_first_response_histogram,
15 host_request_responses_counter,
16 host_requests_sent_counter,
17 sent_messages_counter,
18)
19from couchers.models import (
20 Conversation,
21 HostRequest,
22 HostRequestFeedback,
23 HostRequestQuality,
24 HostRequestStatus,
25 Message,
26 MessageType,
27 RateLimitAction,
28 User,
29)
30from couchers.notifications.notify import notify
31from couchers.rate_limits.check import process_rate_limits_and_check_abort
32from couchers.servicers.api import response_rate_to_pb, user_model_to_pb
33from couchers.sql import couchers_select as select
34from couchers.utils import (
35 Timestamp_from_datetime,
36 date_to_api,
37 get_coordinates,
38 now,
39 parse_date,
40 today_in_timezone,
41)
42from proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc
44logger = logging.getLogger(__name__)
46DEFAULT_PAGINATION_LENGTH = 10
47MAX_PAGE_SIZE = 50
50hostrequeststatus2api = {
51 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING,
52 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED,
53 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED,
54 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED,
55 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED,
56}
58hostrequestquality2sql = {
59 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality,
60 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality,
61 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality,
62}
65def message_to_pb(message: Message):
66 """
67 Turns the given message to a protocol buffer
68 """
69 if message.is_normal_message:
70 return conversations_pb2.Message(
71 message_id=message.id,
72 author_user_id=message.author_id,
73 time=Timestamp_from_datetime(message.time),
74 text=conversations_pb2.MessageContentText(text=message.text),
75 )
76 else:
77 return conversations_pb2.Message(
78 message_id=message.id,
79 author_user_id=message.author_id,
80 time=Timestamp_from_datetime(message.time),
81 chat_created=(
82 conversations_pb2.MessageContentChatCreated()
83 if message.message_type == MessageType.chat_created
84 else None
85 ),
86 host_request_status_changed=(
87 conversations_pb2.MessageContentHostRequestStatusChanged(
88 status=hostrequeststatus2api[message.host_request_status_target]
89 )
90 if message.message_type == MessageType.host_request_status_changed
91 else None
92 ),
93 )
96def host_request_to_pb(host_request: HostRequest, session, context):
97 initial_message = session.execute(
98 select(Message)
99 .where(Message.conversation_id == host_request.conversation_id)
100 .order_by(Message.id.asc())
101 .limit(1)
102 ).scalar_one()
104 latest_message = session.execute(
105 select(Message)
106 .where(Message.conversation_id == host_request.conversation_id)
107 .order_by(Message.id.desc())
108 .limit(1)
109 ).scalar_one()
111 lat, lng = get_coordinates(host_request.hosting_location)
113 need_feedback = False
114 if context.user_id == host_request.host_user_id and host_request.status == HostRequestStatus.rejected:
115 need_feedback = not session.execute(
116 select(
117 exists().where(
118 HostRequestFeedback.from_user_id == context.user_id,
119 HostRequestFeedback.host_request_id == host_request.conversation_id,
120 )
121 )
122 ).scalar_one()
124 return requests_pb2.HostRequest(
125 host_request_id=host_request.conversation_id,
126 surfer_user_id=host_request.surfer_user_id,
127 host_user_id=host_request.host_user_id,
128 status=hostrequeststatus2api[host_request.status],
129 created=Timestamp_from_datetime(initial_message.time),
130 from_date=date_to_api(host_request.from_date),
131 to_date=date_to_api(host_request.to_date),
132 last_seen_message_id=(
133 host_request.surfer_last_seen_message_id
134 if context.user_id == host_request.surfer_user_id
135 else host_request.host_last_seen_message_id
136 ),
137 latest_message=message_to_pb(latest_message),
138 hosting_city=host_request.hosting_city,
139 hosting_lat=lat,
140 hosting_lng=lng,
141 hosting_radius=host_request.hosting_radius,
142 need_host_request_feedback=need_feedback,
143 )
146def _possibly_observe_first_response_time(session, host_request, user_id, response_type):
147 # if this is the first response then there's nothing by this user yet
148 assert host_request.host_user_id == user_id
150 number_messages_by_host = session.execute(
151 select(func.count())
152 .where(Message.conversation_id == host_request.conversation_id)
153 .where(Message.author_id == user_id)
154 ).scalar_one_or_none()
156 if number_messages_by_host == 0:
157 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one()
158 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one()
159 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe(
160 (now() - host_request.conversation.created).total_seconds()
161 )
164class Requests(requests_pb2_grpc.RequestsServicer):
165 def CreateHostRequest(self, request, context, session):
166 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
167 if not user.has_completed_profile:
168 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_REQUEST)
170 if request.host_user_id == context.user_id:
171 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_REQUEST_SELF)
173 # just to check host exists and is visible
174 host = session.execute(
175 select(User).where_users_visible(context).where(User.id == request.host_user_id)
176 ).scalar_one_or_none()
177 if not host:
178 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
180 from_date = parse_date(request.from_date)
181 to_date = parse_date(request.to_date)
183 if not from_date or not to_date:
184 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_DATE)
186 today = today_in_timezone(host.timezone)
188 # request starts from the past
189 if from_date < today:
190 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_BEFORE_TODAY)
192 # from_date is not >= to_date
193 if from_date >= to_date:
194 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_AFTER_TO)
196 # No need to check today > to_date
198 if from_date - today > timedelta(days=365):
199 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_AFTER_ONE_YEAR)
201 if to_date - from_date > timedelta(days=365):
202 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_TO_AFTER_ONE_YEAR)
204 # Check if user has been sending host requests excessively
205 if process_rate_limits_and_check_abort(
206 session=session, user_id=context.user_id, action=RateLimitAction.host_request
207 ):
208 context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, errors.HOST_REQUEST_RATE_LIMIT)
210 conversation = Conversation()
211 session.add(conversation)
212 session.flush()
214 session.add(
215 Message(
216 conversation_id=conversation.id,
217 author_id=context.user_id,
218 message_type=MessageType.chat_created,
219 )
220 )
222 message = Message(
223 conversation_id=conversation.id,
224 author_id=context.user_id,
225 text=request.text,
226 message_type=MessageType.text,
227 )
228 session.add(message)
229 session.flush()
231 host_request = HostRequest(
232 conversation_id=conversation.id,
233 surfer_user_id=context.user_id,
234 host_user_id=host.id,
235 from_date=from_date,
236 to_date=to_date,
237 status=HostRequestStatus.pending,
238 surfer_last_seen_message_id=message.id,
239 # TODO: tz
240 # timezone=host.timezone,
241 hosting_city=host.city,
242 hosting_location=host.geom,
243 hosting_radius=host.geom_radius,
244 )
245 session.add(host_request)
246 session.commit()
248 notify(
249 session,
250 user_id=host_request.host_user_id,
251 topic_action="host_request:create",
252 key=host_request.conversation_id,
253 data=notification_data_pb2.HostRequestCreate(
254 host_request=host_request_to_pb(host_request, session, context),
255 surfer=user_model_to_pb(host_request.surfer, session, context),
256 text=request.text,
257 ),
258 )
260 host_requests_sent_counter.labels(user.gender, host.gender).inc()
261 sent_messages_counter.labels(user.gender, "host request send").inc()
262 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe(
263 (now() - user.joined).total_seconds()
264 )
266 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id)
268 def GetHostRequest(self, request, context, session):
269 host_request = session.execute(
270 select(HostRequest)
271 .where_users_column_visible(context, HostRequest.surfer_user_id)
272 .where_users_column_visible(context, HostRequest.host_user_id)
273 .where(HostRequest.conversation_id == request.host_request_id)
274 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id))
275 ).scalar_one_or_none()
277 if not host_request:
278 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
280 return host_request_to_pb(host_request, session, context)
282 def ListHostRequests(self, request, context, session):
283 if request.only_sent and request.only_received:
284 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_SENT_OR_RECEIVED)
286 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
287 pagination = min(pagination, MAX_PAGE_SIZE)
289 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have
290 # none as message_2.id. So just filter for these ones to get highest messages only.
291 # See https://stackoverflow.com/a/27802817/6115336
292 message_2 = aliased(Message)
293 statement = (
294 select(Message, HostRequest, Conversation)
295 .outerjoin(message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id))
296 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
297 .join(Conversation, Conversation.id == HostRequest.conversation_id)
298 .where_users_column_visible(context, HostRequest.surfer_user_id)
299 .where_users_column_visible(context, HostRequest.host_user_id)
300 .where(message_2.id == None)
301 .where(or_(Message.id < request.last_request_id, request.last_request_id == 0))
302 )
304 if request.only_sent:
305 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
306 elif request.only_received:
307 statement = statement.where(HostRequest.host_user_id == context.user_id)
308 elif request.HasField("only_archived"):
309 statement = statement.where(
310 or_(
311 and_(
312 HostRequest.surfer_user_id == context.user_id,
313 HostRequest.is_surfer_archived == request.only_archived,
314 ),
315 and_(
316 HostRequest.host_user_id == context.user_id,
317 HostRequest.is_host_archived == request.only_archived,
318 ),
319 )
320 )
321 else:
322 statement = statement.where(
323 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
324 )
326 # TODO: I considered having the latest control message be the single source of truth for
327 # the HostRequest.status, but decided against it because of this filter.
328 # Another possibility is to filter in the python instead of SQL, but that's slower
329 if request.only_active:
330 statement = statement.where(
331 or_(
332 HostRequest.status == HostRequestStatus.pending,
333 HostRequest.status == HostRequestStatus.accepted,
334 HostRequest.status == HostRequestStatus.confirmed,
335 )
336 )
337 statement = statement.where(HostRequest.end_time <= func.now())
339 statement = statement.order_by(Message.id.desc()).limit(pagination + 1)
340 results = session.execute(statement).all()
342 host_requests = []
343 for result in results[:pagination]:
344 lat, lng = get_coordinates(result.HostRequest.hosting_location)
345 host_requests.append(
346 requests_pb2.HostRequest(
347 host_request_id=result.HostRequest.conversation_id,
348 surfer_user_id=result.HostRequest.surfer_user_id,
349 host_user_id=result.HostRequest.host_user_id,
350 status=hostrequeststatus2api[result.HostRequest.status],
351 created=Timestamp_from_datetime(result.Conversation.created),
352 from_date=date_to_api(result.HostRequest.from_date),
353 to_date=date_to_api(result.HostRequest.to_date),
354 last_seen_message_id=(
355 result.HostRequest.surfer_last_seen_message_id
356 if context.user_id == result.HostRequest.surfer_user_id
357 else result.HostRequest.host_last_seen_message_id
358 ),
359 latest_message=message_to_pb(result.Message),
360 hosting_city=result.HostRequest.hosting_city,
361 hosting_lat=lat,
362 hosting_lng=lng,
363 hosting_radius=result.HostRequest.hosting_radius,
364 )
365 )
367 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO
368 no_more = len(results) <= pagination
370 return requests_pb2.ListHostRequestsRes(
371 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests
372 )
374 def RespondHostRequest(self, request, context, session):
375 def count_host_response(other_user_id, response_type):
376 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
377 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one()
378 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc()
379 sent_messages_counter.labels(user_gender, "host request response").inc()
381 host_request = session.execute(
382 select(HostRequest)
383 .where_users_column_visible(context, HostRequest.surfer_user_id)
384 .where_users_column_visible(context, HostRequest.host_user_id)
385 .where(HostRequest.conversation_id == request.host_request_id)
386 ).scalar_one_or_none()
388 if not host_request:
389 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
391 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
392 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
394 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING:
395 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
397 if host_request.end_time < now():
398 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_IN_PAST)
400 control_message = Message()
402 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED:
403 # only host can accept
404 if context.user_id != host_request.host_user_id:
405 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.NOT_THE_HOST)
406 # can't accept a cancelled or confirmed request (only reject), or already accepted
407 if (
408 host_request.status == HostRequestStatus.cancelled
409 or host_request.status == HostRequestStatus.confirmed
410 or host_request.status == HostRequestStatus.accepted
411 ):
412 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
413 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted")
414 control_message.host_request_status_target = HostRequestStatus.accepted
415 host_request.status = HostRequestStatus.accepted
416 session.flush()
418 notify(
419 session,
420 user_id=host_request.surfer_user_id,
421 topic_action="host_request:accept",
422 key=host_request.conversation_id,
423 data=notification_data_pb2.HostRequestAccept(
424 host_request=host_request_to_pb(host_request, session, context),
425 host=user_model_to_pb(host_request.host, session, context),
426 ),
427 )
429 count_host_response(host_request.surfer_user_id, "accepted")
431 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED:
432 # only host can reject
433 if context.user_id != host_request.host_user_id:
434 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
435 # can't reject a cancelled or already rejected request
436 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected:
437 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
438 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected")
439 control_message.host_request_status_target = HostRequestStatus.rejected
440 host_request.status = HostRequestStatus.rejected
441 session.flush()
443 notify(
444 session,
445 user_id=host_request.surfer_user_id,
446 topic_action="host_request:reject",
447 key=host_request.conversation_id,
448 data=notification_data_pb2.HostRequestReject(
449 host_request=host_request_to_pb(host_request, session, context),
450 host=user_model_to_pb(host_request.host, session, context),
451 ),
452 )
454 count_host_response(host_request.surfer_user_id, "rejected")
456 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED:
457 # only surfer can confirm
458 if context.user_id != host_request.surfer_user_id:
459 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
460 # can only confirm an accepted request
461 if host_request.status != HostRequestStatus.accepted:
462 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
463 control_message.host_request_status_target = HostRequestStatus.confirmed
464 host_request.status = HostRequestStatus.confirmed
465 session.flush()
467 notify(
468 session,
469 user_id=host_request.host_user_id,
470 topic_action="host_request:confirm",
471 key=host_request.conversation_id,
472 data=notification_data_pb2.HostRequestConfirm(
473 host_request=host_request_to_pb(host_request, session, context),
474 surfer=user_model_to_pb(host_request.surfer, session, context),
475 ),
476 )
478 count_host_response(host_request.host_user_id, "confirmed")
480 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED:
481 # only surfer can cancel
482 if context.user_id != host_request.surfer_user_id:
483 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
484 # can't' cancel an already cancelled or rejected request
485 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled:
486 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
487 control_message.host_request_status_target = HostRequestStatus.cancelled
488 host_request.status = HostRequestStatus.cancelled
489 session.flush()
491 notify(
492 session,
493 user_id=host_request.host_user_id,
494 topic_action="host_request:cancel",
495 key=host_request.conversation_id,
496 data=notification_data_pb2.HostRequestCancel(
497 host_request=host_request_to_pb(host_request, session, context),
498 surfer=user_model_to_pb(host_request.surfer, session, context),
499 ),
500 )
502 count_host_response(host_request.host_user_id, "cancelled")
504 control_message.message_type = MessageType.host_request_status_changed
505 control_message.conversation_id = host_request.conversation_id
506 control_message.author_id = context.user_id
507 session.add(control_message)
509 if request.text:
510 latest_message = Message()
511 latest_message.conversation_id = host_request.conversation_id
512 latest_message.text = request.text
513 latest_message.author_id = context.user_id
514 latest_message.message_type = MessageType.text
515 session.add(latest_message)
516 else:
517 latest_message = control_message
519 session.flush()
521 if host_request.surfer_user_id == context.user_id:
522 host_request.surfer_last_seen_message_id = latest_message.id
523 else:
524 host_request.host_last_seen_message_id = latest_message.id
525 session.commit()
527 return empty_pb2.Empty()
529 def GetHostRequestMessages(self, request, context, session):
530 host_request = session.execute(
531 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id)
532 ).scalar_one_or_none()
534 if not host_request:
535 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
537 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
538 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
540 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
541 pagination = min(pagination, MAX_PAGE_SIZE)
543 messages = (
544 session.execute(
545 select(Message)
546 .where(Message.conversation_id == host_request.conversation_id)
547 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0))
548 .order_by(Message.id.desc())
549 .limit(pagination + 1)
550 )
551 .scalars()
552 .all()
553 )
555 no_more = len(messages) <= pagination
557 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0
559 return requests_pb2.GetHostRequestMessagesRes(
560 last_message_id=last_message_id,
561 no_more=no_more,
562 messages=[message_to_pb(message) for message in messages[:pagination]],
563 )
565 def SendHostRequestMessage(self, request, context, session):
566 if request.text == "":
567 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE)
568 host_request = session.execute(
569 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id)
570 ).scalar_one_or_none()
572 if not host_request:
573 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
575 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
576 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
578 if host_request.host_user_id == context.user_id:
579 _possibly_observe_first_response_time(session, host_request, context.user_id, "message")
581 message = Message()
582 message.conversation_id = host_request.conversation_id
583 message.author_id = context.user_id
584 message.message_type = MessageType.text
585 message.text = request.text
586 session.add(message)
587 session.flush()
589 if host_request.surfer_user_id == context.user_id:
590 host_request.surfer_last_seen_message_id = message.id
592 notify(
593 session,
594 user_id=host_request.host_user_id,
595 topic_action="host_request:message",
596 key=host_request.conversation_id,
597 data=notification_data_pb2.HostRequestMessage(
598 host_request=host_request_to_pb(host_request, session, context),
599 user=user_model_to_pb(host_request.surfer, session, context),
600 text=request.text,
601 am_host=True,
602 ),
603 )
605 else:
606 host_request.host_last_seen_message_id = message.id
608 notify(
609 session,
610 user_id=host_request.surfer_user_id,
611 topic_action="host_request:message",
612 key=host_request.conversation_id,
613 data=notification_data_pb2.HostRequestMessage(
614 host_request=host_request_to_pb(host_request, session, context),
615 user=user_model_to_pb(host_request.host, session, context),
616 text=request.text,
617 am_host=False,
618 ),
619 )
621 session.commit()
623 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
624 sent_messages_counter.labels(user_gender, "host request").inc()
626 return empty_pb2.Empty()
628 def GetHostRequestUpdates(self, request, context, session):
629 if request.only_sent and request.only_received:
630 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_SENT_OR_RECEIVED)
632 if request.newest_message_id == 0:
633 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE)
635 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none():
636 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE)
638 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
639 pagination = min(pagination, MAX_PAGE_SIZE)
641 statement = (
642 select(
643 Message,
644 HostRequest.status.label("host_request_status"),
645 HostRequest.conversation_id.label("host_request_id"),
646 )
647 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
648 .where(Message.id > request.newest_message_id)
649 )
651 if request.only_sent:
652 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
653 elif request.only_received:
654 statement = statement.where(HostRequest.host_user_id == context.user_id)
655 else:
656 statement = statement.where(
657 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
658 )
660 statement = statement.order_by(Message.id.asc()).limit(pagination + 1)
661 res = session.execute(statement).all()
663 no_more = len(res) <= pagination
665 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO
667 return requests_pb2.GetHostRequestUpdatesRes(
668 no_more=no_more,
669 updates=[
670 requests_pb2.HostRequestUpdate(
671 host_request_id=result.host_request_id,
672 status=hostrequeststatus2api[result.host_request_status],
673 message=message_to_pb(result.Message),
674 )
675 for result in res[:pagination]
676 ],
677 )
679 def MarkLastSeenHostRequest(self, request, context, session):
680 host_request = session.execute(
681 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id)
682 ).scalar_one_or_none()
684 if not host_request:
685 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
687 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
688 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
690 if host_request.surfer_user_id == context.user_id:
691 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id:
692 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES)
693 host_request.surfer_last_seen_message_id = request.last_seen_message_id
694 else:
695 if not host_request.host_last_seen_message_id <= request.last_seen_message_id:
696 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES)
697 host_request.host_last_seen_message_id = request.last_seen_message_id
699 session.commit()
700 return empty_pb2.Empty()
702 def SetHostRequestArchiveStatus(self, request, context, session):
703 host_request: HostRequest = session.execute(
704 select(HostRequest)
705 .where(HostRequest.conversation_id == request.host_request_id)
706 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id))
707 ).scalar_one_or_none()
709 if not host_request:
710 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
712 if context.user_id == host_request.surfer_user_id:
713 host_request.is_surfer_archived = request.is_archived
714 else:
715 host_request.is_host_archived = request.is_archived
717 return requests_pb2.SetHostRequestArchiveStatusRes(
718 host_request_id=host_request.conversation_id,
719 is_archived=request.is_archived,
720 )
722 def GetResponseRate(self, request, context, session):
723 user_res = session.execute(
724 select(User.id, UserResponseRate)
725 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id)
726 .where_users_visible(context)
727 .where(User.id == request.user_id)
728 ).one_or_none()
730 # if user doesn't exist, return None
731 if not user_res:
732 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
734 user, response_rates = user_res
735 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates))
737 def SendHostRequestFeedback(self, request, context, session):
738 host_request = session.execute(
739 select(HostRequest)
740 .where(HostRequest.conversation_id == request.host_request_id)
741 .where(HostRequest.host_user_id == context.user_id)
742 ).scalar_one_or_none()
744 if not host_request:
745 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
747 feedback = session.execute(
748 select(HostRequestFeedback)
749 .where(HostRequestFeedback.host_request_id == host_request.conversation_id)
750 .where(HostRequestFeedback.from_user_id == context.user_id)
751 ).scalar_one_or_none()
753 if feedback:
754 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_LEFT_HOST_REQUEST_FEEDBACK)
756 session.add(
757 HostRequestFeedback(
758 host_request_id=host_request.conversation_id,
759 from_user_id=host_request.host_user_id,
760 to_user_id=host_request.surfer_user_id,
761 request_quality=hostrequestquality2sql.get(request.host_request_quality),
762 decline_reason=request.decline_reason,
763 )
764 )
766 return empty_pb2.Empty()