Coverage for app/backend/src/couchers/servicers/requests.py: 92%
339 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy import exists, select
7from sqlalchemy.orm import Session, aliased
8from sqlalchemy.sql import and_, func, or_
10from couchers.constants import HOST_REQUEST_MIN_LENGTH_UTF16
11from couchers.context import CouchersContext, make_notification_user_context
12from couchers.db import can_moderate_node
13from couchers.event_log import log_event
14from couchers.helpers.completed_profile import has_completed_profile
15from couchers.materialized_views import UserResponseRate
16from couchers.metrics import (
17 account_age_on_host_request_create_histogram,
18 host_request_first_response_histogram,
19 host_request_responses_counter,
20 host_requests_sent_counter,
21 sent_messages_counter,
22)
23from couchers.models import (
24 Conversation,
25 HostRequest,
26 HostRequestFeedback,
27 HostRequestQuality,
28 HostRequestStatus,
29 Message,
30 MessageType,
31 ModerationObjectType,
32 RateLimitAction,
33 User,
34)
35from couchers.models.notifications import NotificationTopicAction
36from couchers.models.public_trips import PublicTrip, PublicTripStatus
37from couchers.moderation.utils import create_moderation
38from couchers.notifications.notify import mark_notifications_seen, notify
39from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc
40from couchers.rate_limits.check import process_rate_limits_and_check_abort
41from couchers.rate_limits.definitions import RATE_LIMIT_HOURS
42from couchers.servicers.api import response_rate_to_pb, user_model_to_pb
43from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible
44from couchers.utils import (
45 Timestamp_from_datetime,
46 date_to_api,
47 get_coordinates,
48 now,
49 parse_date,
50 today_in_timezone,
51)
53logger = logging.getLogger(__name__)
55DEFAULT_PAGINATION_LENGTH = 10
56MAX_PAGE_SIZE = 50
59hostrequeststatus2api = {
60 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING,
61 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED,
62 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED,
63 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED,
64 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED,
65}
67api2hostrequeststatus = {
68 conversations_pb2.HOST_REQUEST_STATUS_PENDING: HostRequestStatus.pending,
69 conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED: HostRequestStatus.accepted,
70 conversations_pb2.HOST_REQUEST_STATUS_REJECTED: HostRequestStatus.rejected,
71 conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED: HostRequestStatus.confirmed,
72 conversations_pb2.HOST_REQUEST_STATUS_CANCELLED: HostRequestStatus.cancelled,
73}
75hostrequestquality2sql = {
76 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality,
77 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality,
78 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality,
79}
82def message_to_pb(message: Message) -> conversations_pb2.Message:
83 """
84 Turns the given message to a protocol buffer
85 """
86 if message.is_normal_message:
87 return conversations_pb2.Message(
88 message_id=message.id,
89 author_user_id=message.author_id,
90 time=Timestamp_from_datetime(message.time),
91 text=conversations_pb2.MessageContentText(text=message.text),
92 )
93 else:
94 return conversations_pb2.Message(
95 message_id=message.id,
96 author_user_id=message.author_id,
97 time=Timestamp_from_datetime(message.time),
98 chat_created=(
99 conversations_pb2.MessageContentChatCreated()
100 if message.message_type == MessageType.chat_created
101 else None
102 ),
103 host_request_status_changed=(
104 conversations_pb2.MessageContentHostRequestStatusChanged(
105 status=hostrequeststatus2api[message.host_request_status_target] # type: ignore[index]
106 )
107 if message.message_type == MessageType.host_request_status_changed
108 else None
109 ),
110 )
113def host_request_to_pb(
114 host_request: HostRequest, session: Session, context: CouchersContext
115) -> requests_pb2.HostRequest:
116 initial_message = session.execute(
117 select(Message)
118 .where(Message.conversation_id == host_request.conversation_id)
119 .order_by(Message.id.asc())
120 .limit(1)
121 ).scalar_one()
123 latest_message = session.execute(
124 select(Message)
125 .where(Message.conversation_id == host_request.conversation_id)
126 .order_by(Message.id.desc())
127 .limit(1)
128 ).scalar_one()
130 lat, lng = get_coordinates(host_request.hosting_location)
132 need_feedback = False
133 if context.user_id == host_request.recipient_user_id and host_request.status == HostRequestStatus.rejected:
134 need_feedback = not session.execute(
135 select(
136 exists().where(
137 HostRequestFeedback.from_user_id == context.user_id,
138 HostRequestFeedback.host_request_id == host_request.conversation_id,
139 )
140 )
141 ).scalar_one()
143 return requests_pb2.HostRequest(
144 host_request_id=host_request.conversation_id,
145 surfer_user_id=host_request.initiator_user_id,
146 host_user_id=host_request.recipient_user_id,
147 status=hostrequeststatus2api[host_request.status],
148 created=Timestamp_from_datetime(initial_message.time),
149 from_date=date_to_api(host_request.from_date),
150 to_date=date_to_api(host_request.to_date),
151 last_seen_message_id=(
152 host_request.initiator_last_seen_message_id
153 if context.user_id == host_request.initiator_user_id
154 else host_request.recipient_last_seen_message_id
155 ),
156 latest_message=message_to_pb(latest_message),
157 hosting_city=host_request.hosting_city,
158 hosting_lat=lat,
159 hosting_lng=lng,
160 hosting_radius=host_request.hosting_radius,
161 need_host_request_feedback=need_feedback,
162 is_archived=(
163 host_request.is_recipient_archived
164 if context.user_id == host_request.recipient_user_id
165 else host_request.is_initiator_archived
166 ),
167 public_trip_id=host_request.public_trip_id,
168 )
171def _possibly_observe_first_response_time(
172 session: Session, host_request: HostRequest, user_id: int, response_type: str
173) -> None:
174 # if this is the first response then there's nothing by this user yet
175 assert host_request.recipient_user_id == user_id
177 number_messages_by_host = session.execute(
178 select(func.count())
179 .where(Message.conversation_id == host_request.conversation_id)
180 .where(Message.author_id == user_id)
181 ).scalar_one_or_none()
183 if number_messages_by_host == 0:
184 host_gender = session.execute(select(User.gender).where(User.id == host_request.recipient_user_id)).scalar_one()
185 surfer_gender = session.execute(
186 select(User.gender).where(User.id == host_request.initiator_user_id)
187 ).scalar_one()
188 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe(
189 (now() - host_request.conversation.created).total_seconds()
190 )
193def _is_host_request_long_enough(text: str) -> bool:
194 # Python's len(str) does not match Javascript's string.length.
195 # e.g. len("é") == 2 but "é".length == 1.
196 # To match the frontend's validation, measure the string in utf16 code units.
197 text_length_utf16 = len(text.encode("utf-16-le")) // 2 # utf-16-le does not include a prefix BOM code unit.
198 return text_length_utf16 >= HOST_REQUEST_MIN_LENGTH_UTF16
201class Requests(requests_pb2_grpc.RequestsServicer):
202 def CreateHostRequest(
203 self, request: requests_pb2.CreateHostRequestReq, context: CouchersContext, session: Session
204 ) -> requests_pb2.CreateHostRequestRes:
205 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
206 if not has_completed_profile(session, user):
207 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request")
209 if request.host_user_id == context.user_id:
210 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self")
212 # just to check recipient exists and is visible
213 recipient = session.execute(
214 select(User).where(users_visible(context, User)).where(User.id == request.host_user_id)
215 ).scalar_one_or_none()
216 if not recipient:
217 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
219 from_date = parse_date(request.from_date)
220 to_date = parse_date(request.to_date)
222 if not from_date or not to_date:
223 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date")
225 today = today_in_timezone(recipient.timezone)
227 # request starts from the past
228 if from_date < today:
229 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today")
231 # from_date is not >= to_date
232 if from_date >= to_date:
233 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to")
235 # No need to check today > to_date
237 if from_date - today > timedelta(days=365):
238 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year")
240 if to_date - from_date > timedelta(days=365):
241 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year")
243 # Check minimum length
244 if not _is_host_request_long_enough(request.text):
245 context.abort_with_error_code(
246 grpc.StatusCode.INVALID_ARGUMENT,
247 "host_request_too_short2",
248 substitutions={"count": HOST_REQUEST_MIN_LENGTH_UTF16},
249 )
251 # Check if user has been sending host requests excessively
252 if process_rate_limits_and_check_abort(
253 session=session, user_id=context.user_id, action=RateLimitAction.host_request
254 ):
255 context.abort_with_error_code(
256 grpc.StatusCode.RESOURCE_EXHAUSTED,
257 "host_request_rate_limit2",
258 substitutions={"count": RATE_LIMIT_HOURS},
259 )
261 # If this is an offer in response to a public trip, validate it
262 public_trip_id = request.public_trip_id if request.HasField("public_trip_id") else None
263 if public_trip_id is not None:
264 public_trip = session.execute(
265 select(PublicTrip).where(PublicTrip.id == public_trip_id)
266 ).scalar_one_or_none()
267 if not public_trip:
268 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "public_trip_not_found")
269 # The trip's traveler must be the recipient of this host request (role reversal)
270 if public_trip.user_id != recipient.id:
271 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "public_trip_user_mismatch")
272 # Trip must still be active
273 if public_trip.status != PublicTripStatus.searching_for_host:
274 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "public_trip_not_active")
275 # Offered dates must fall within the trip's window (host can shorten, not extend)
276 if from_date < public_trip.from_date or to_date > public_trip.to_date:
277 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "public_trip_dates_out_of_range")
278 # Enforce same_gender_only restriction (community moderators bypass)
279 if (
280 public_trip.same_gender_only
281 and not can_moderate_node(session, context.user_id, public_trip.node_id)
282 and user.gender != recipient.gender
283 ):
284 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "public_trip_same_gender_only")
285 # Prevent duplicate offers on the same trip
286 existing_offer = session.execute(
287 select(HostRequest)
288 .where(HostRequest.public_trip_id == public_trip_id)
289 .where(HostRequest.initiator_user_id == context.user_id)
290 ).scalar_one_or_none()
291 if existing_offer:
292 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "duplicate_host_request_for_trip")
294 conversation = Conversation()
295 session.add(conversation)
296 session.flush()
298 session.add(
299 Message(
300 conversation_id=conversation.id,
301 author_id=context.user_id,
302 message_type=MessageType.chat_created,
303 )
304 )
306 message = Message(
307 conversation_id=conversation.id,
308 author_id=context.user_id,
309 text=request.text,
310 message_type=MessageType.text,
311 )
312 session.add(message)
313 session.flush()
315 # Create moderation state for UMS (starts as SHADOWED)
316 moderation_state = create_moderation(
317 session=session,
318 object_type=ModerationObjectType.host_request,
319 object_id=conversation.id,
320 creator_user_id=context.user_id,
321 )
323 host_request = HostRequest(
324 conversation_id=conversation.id,
325 initiator_user_id=context.user_id,
326 recipient_user_id=recipient.id,
327 moderation_state_id=moderation_state.id,
328 from_date=from_date,
329 to_date=to_date,
330 status=HostRequestStatus.pending,
331 initiator_last_seen_message_id=message.id,
332 # TODO: tz
333 # timezone=recipient.timezone,
334 hosting_city=recipient.city,
335 hosting_location=recipient.geom,
336 hosting_radius=recipient.geom_radius,
337 public_trip_id=public_trip_id,
338 )
339 session.add(host_request)
340 session.flush()
342 recipient_context = make_notification_user_context(user_id=host_request.recipient_user_id)
343 notify(
344 session,
345 user_id=host_request.recipient_user_id,
346 topic_action=NotificationTopicAction.host_request__create,
347 key=str(host_request.conversation_id),
348 data=notification_data_pb2.HostRequestCreate(
349 host_request=host_request_to_pb(host_request, session, recipient_context),
350 surfer=user_model_to_pb(host_request.initiator, session, recipient_context),
351 text=request.text,
352 ),
353 moderation_state_id=moderation_state.id,
354 )
356 host_requests_sent_counter.labels(user.gender, recipient.gender).inc()
357 sent_messages_counter.labels(user.gender, "host request send").inc()
358 account_age_on_host_request_create_histogram.labels(user.gender, recipient.gender).observe(
359 (now() - user.joined).total_seconds()
360 )
361 log_event(
362 context,
363 session,
364 "host_request.created",
365 {
366 "host_request_id": host_request.conversation_id,
367 "host_id": recipient.id,
368 "surfer_gender": user.gender,
369 "host_gender": recipient.gender,
370 "city": recipient.city,
371 "from_date": str(from_date),
372 "to_date": str(to_date),
373 "nights": (to_date - from_date).days,
374 },
375 )
377 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id)
379 def GetHostRequest(
380 self, request: requests_pb2.GetHostRequestReq, context: CouchersContext, session: Session
381 ) -> requests_pb2.HostRequest:
382 host_request = session.execute(
383 where_moderated_content_visible(
384 where_users_column_visible(
385 where_users_column_visible(
386 select(HostRequest),
387 context,
388 HostRequest.initiator_user_id,
389 ),
390 context,
391 HostRequest.recipient_user_id,
392 ),
393 context,
394 HostRequest,
395 is_list_operation=False,
396 )
397 .where(HostRequest.conversation_id == request.host_request_id)
398 .where(
399 or_(HostRequest.initiator_user_id == context.user_id, HostRequest.recipient_user_id == context.user_id)
400 )
401 ).scalar_one_or_none()
403 if not host_request:
404 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
406 return host_request_to_pb(host_request, session, context)
408 def ListHostRequests(
409 self, request: requests_pb2.ListHostRequestsReq, context: CouchersContext, session: Session
410 ) -> requests_pb2.ListHostRequestsRes:
411 if request.only_sent and request.only_received: 411 ↛ 412line 411 didn't jump to line 412 because the condition on line 411 was never true
412 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
414 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
415 pagination = min(pagination, MAX_PAGE_SIZE)
417 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have
418 # none as message_2.id. So just filter for these to get the highest messages only.
419 # See https://stackoverflow.com/a/27802817/6115336
420 message_2 = aliased(Message)
421 statement = where_moderated_content_visible(
422 where_users_column_visible(
423 where_users_column_visible(
424 select(Message, HostRequest, Conversation)
425 .outerjoin(
426 message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id)
427 )
428 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
429 .join(Conversation, Conversation.id == HostRequest.conversation_id),
430 context,
431 HostRequest.initiator_user_id,
432 ),
433 context,
434 HostRequest.recipient_user_id,
435 ),
436 context,
437 HostRequest,
438 is_list_operation=True,
439 ).where(message_2.id == None)
441 sort_by_from_date = request.sort_by == requests_pb2.HOST_REQUEST_SORT_BY_FROM_DATE
443 if sort_by_from_date:
444 if request.page_token:
445 token_date_str, token_conv_id_str = request.page_token.split(":")
446 token_date = parse_date(token_date_str)
447 token_conv_id = int(token_conv_id_str)
448 statement = statement.where(
449 or_(
450 HostRequest.from_date > token_date,
451 and_(
452 HostRequest.from_date == token_date,
453 HostRequest.conversation_id > token_conv_id,
454 ),
455 )
456 )
457 else:
458 if request.page_token:
459 statement = statement.where(Message.id < int(request.page_token))
461 if request.only_sent:
462 statement = statement.where(HostRequest.initiator_user_id == context.user_id)
463 elif request.only_received:
464 statement = statement.where(HostRequest.recipient_user_id == context.user_id)
465 elif request.HasField("only_archived"):
466 statement = statement.where(
467 or_(
468 and_(
469 HostRequest.initiator_user_id == context.user_id,
470 HostRequest.is_initiator_archived == request.only_archived,
471 ),
472 and_(
473 HostRequest.recipient_user_id == context.user_id,
474 HostRequest.is_recipient_archived == request.only_archived,
475 ),
476 )
477 )
478 else:
479 statement = statement.where(
480 or_(HostRequest.recipient_user_id == context.user_id, HostRequest.initiator_user_id == context.user_id)
481 )
483 # TODO: I considered having the latest control message be the single source of truth for
484 # the HostRequest.status, but decided against it because of this filter.
485 # Another possibility is to filter in the python instead of SQL, but that's slower
486 if request.only_active:
487 statement = statement.where(
488 or_(
489 HostRequest.status == HostRequestStatus.pending,
490 HostRequest.status == HostRequestStatus.accepted,
491 HostRequest.status == HostRequestStatus.confirmed,
492 )
493 )
494 statement = statement.where(HostRequest.end_time >= func.now())
496 if request.status_in:
497 statement = statement.where(HostRequest.status.in_([api2hostrequeststatus[s] for s in request.status_in]))
499 if sort_by_from_date:
500 statement = statement.order_by(HostRequest.from_date.asc(), HostRequest.conversation_id.asc())
501 else:
502 statement = statement.order_by(Message.id.desc())
503 statement = statement.limit(pagination + 1)
504 results = session.execute(statement).all()
506 host_requests = []
507 for result in results[:pagination]:
508 lat, lng = get_coordinates(result.HostRequest.hosting_location)
509 host_requests.append(
510 requests_pb2.HostRequest(
511 host_request_id=result.HostRequest.conversation_id,
512 surfer_user_id=result.HostRequest.initiator_user_id,
513 host_user_id=result.HostRequest.recipient_user_id,
514 status=hostrequeststatus2api[result.HostRequest.status],
515 created=Timestamp_from_datetime(result.Conversation.created),
516 from_date=date_to_api(result.HostRequest.from_date),
517 to_date=date_to_api(result.HostRequest.to_date),
518 last_seen_message_id=(
519 result.HostRequest.initiator_last_seen_message_id
520 if context.user_id == result.HostRequest.initiator_user_id
521 else result.HostRequest.recipient_last_seen_message_id
522 ),
523 latest_message=message_to_pb(result.Message),
524 hosting_city=result.HostRequest.hosting_city,
525 hosting_lat=lat,
526 hosting_lng=lng,
527 hosting_radius=result.HostRequest.hosting_radius,
528 )
529 )
531 no_more = len(results) <= pagination
533 if len(results) > pagination:
534 if sort_by_from_date:
535 last = results[pagination - 1]
536 next_page_token = f"{date_to_api(last.HostRequest.from_date)}:{last.HostRequest.conversation_id}"
537 else:
538 next_page_token = str(min(g.Message.id for g in results[:pagination]))
539 else:
540 next_page_token = None
542 return requests_pb2.ListHostRequestsRes(
543 next_page_token=next_page_token, no_more=no_more, host_requests=host_requests
544 )
546 def RespondHostRequest(
547 self, request: requests_pb2.RespondHostRequestReq, context: CouchersContext, session: Session
548 ) -> empty_pb2.Empty:
549 def count_host_response(other_user_id: int, response_type: str) -> None:
550 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
551 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one()
552 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc()
553 sent_messages_counter.labels(user_gender, "host request response").inc()
555 host_request = session.execute(
556 where_moderated_content_visible(
557 where_users_column_visible(
558 where_users_column_visible(
559 select(HostRequest),
560 context,
561 HostRequest.initiator_user_id,
562 ),
563 context,
564 HostRequest.recipient_user_id,
565 ),
566 context,
567 HostRequest,
568 is_list_operation=False,
569 ).where(HostRequest.conversation_id == request.host_request_id)
570 ).scalar_one_or_none()
572 if not host_request:
573 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
575 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id:
576 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
578 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING:
579 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
581 if host_request.end_time < now(): 581 ↛ 582line 581 didn't jump to line 582 because the condition on line 581 was never true
582 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past")
584 control_message = Message(
585 message_type=MessageType.host_request_status_changed,
586 conversation_id=host_request.conversation_id,
587 author_id=context.user_id,
588 )
590 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED:
591 # only host can accept
592 if context.user_id != host_request.recipient_user_id:
593 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host")
594 # can't accept a cancelled or confirmed request (only reject), or already accepted
595 if ( 595 ↛ 600line 595 didn't jump to line 600 because the condition on line 595 was never true
596 host_request.status == HostRequestStatus.cancelled
597 or host_request.status == HostRequestStatus.confirmed
598 or host_request.status == HostRequestStatus.accepted
599 ):
600 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
601 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted")
602 control_message.host_request_status_target = HostRequestStatus.accepted
603 host_request.status = HostRequestStatus.accepted
604 session.flush()
606 recipient_context = make_notification_user_context(user_id=host_request.initiator_user_id)
607 notify(
608 session,
609 user_id=host_request.initiator_user_id,
610 topic_action=NotificationTopicAction.host_request__accept,
611 key=str(host_request.conversation_id),
612 data=notification_data_pb2.HostRequestAccept(
613 host_request=host_request_to_pb(host_request, session, recipient_context),
614 host=user_model_to_pb(host_request.recipient, session, recipient_context),
615 ),
616 moderation_state_id=host_request.moderation_state_id,
617 )
619 count_host_response(host_request.initiator_user_id, "accepted")
620 log_event(
621 context,
622 session,
623 "host_request.accepted",
624 {
625 "host_request_id": host_request.conversation_id,
626 "surfer_id": host_request.initiator_user_id,
627 "host_id": host_request.recipient_user_id,
628 "surfer_gender": host_request.initiator.gender,
629 "host_gender": host_request.recipient.gender,
630 "from_date": str(host_request.from_date),
631 "to_date": str(host_request.to_date),
632 "host_city": host_request.hosting_city,
633 },
634 )
636 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED:
637 # only host can reject
638 if context.user_id != host_request.recipient_user_id: 638 ↛ 639line 638 didn't jump to line 639 because the condition on line 638 was never true
639 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
640 # can't reject a cancelled or already rejected request
641 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true
642 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
643 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected")
644 control_message.host_request_status_target = HostRequestStatus.rejected
645 host_request.status = HostRequestStatus.rejected
646 session.flush()
648 recipient_context = make_notification_user_context(user_id=host_request.initiator_user_id)
649 notify(
650 session,
651 user_id=host_request.initiator_user_id,
652 topic_action=NotificationTopicAction.host_request__reject,
653 key=str(host_request.conversation_id),
654 data=notification_data_pb2.HostRequestReject(
655 host_request=host_request_to_pb(host_request, session, recipient_context),
656 host=user_model_to_pb(host_request.recipient, session, recipient_context),
657 ),
658 moderation_state_id=host_request.moderation_state_id,
659 )
661 count_host_response(host_request.initiator_user_id, "rejected")
663 log_event(
664 context,
665 session,
666 "host_request.rejected",
667 {
668 "host_request_id": host_request.conversation_id,
669 "surfer_id": host_request.initiator_user_id,
670 "host_id": host_request.recipient_user_id,
671 "surfer_gender": host_request.initiator.gender,
672 "host_gender": host_request.recipient.gender,
673 "from_date": str(host_request.from_date),
674 "to_date": str(host_request.to_date),
675 "host_city": host_request.hosting_city,
676 },
677 )
679 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED:
680 # only surfer can confirm
681 if context.user_id != host_request.initiator_user_id:
682 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
683 # can only confirm an accepted request
684 if host_request.status != HostRequestStatus.accepted:
685 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
686 control_message.host_request_status_target = HostRequestStatus.confirmed
687 host_request.status = HostRequestStatus.confirmed
688 session.flush()
690 recipient_context = make_notification_user_context(user_id=host_request.recipient_user_id)
691 notify(
692 session,
693 user_id=host_request.recipient_user_id,
694 topic_action=NotificationTopicAction.host_request__confirm,
695 key=str(host_request.conversation_id),
696 data=notification_data_pb2.HostRequestConfirm(
697 host_request=host_request_to_pb(host_request, session, recipient_context),
698 surfer=user_model_to_pb(host_request.initiator, session, recipient_context),
699 ),
700 moderation_state_id=host_request.moderation_state_id,
701 )
703 count_host_response(host_request.recipient_user_id, "confirmed")
704 log_event(
705 context,
706 session,
707 "host_request.confirmed",
708 {
709 "host_request_id": host_request.conversation_id,
710 "surfer_id": host_request.initiator_user_id,
711 "host_id": host_request.recipient_user_id,
712 "surfer_gender": host_request.initiator.gender,
713 "host_gender": host_request.recipient.gender,
714 "from_date": str(host_request.from_date),
715 "to_date": str(host_request.to_date),
716 "host_city": host_request.hosting_city,
717 },
718 )
720 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED:
721 # only surfer can cancel
722 if context.user_id != host_request.initiator_user_id:
723 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
724 # can't' cancel an already cancelled or rejected request
725 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 725 ↛ 726line 725 didn't jump to line 726 because the condition on line 725 was never true
726 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
727 control_message.host_request_status_target = HostRequestStatus.cancelled
728 host_request.status = HostRequestStatus.cancelled
729 session.flush()
731 recipient_context = make_notification_user_context(user_id=host_request.recipient_user_id)
732 notify(
733 session,
734 user_id=host_request.recipient_user_id,
735 topic_action=NotificationTopicAction.host_request__cancel,
736 key=str(host_request.conversation_id),
737 data=notification_data_pb2.HostRequestCancel(
738 host_request=host_request_to_pb(host_request, session, recipient_context),
739 surfer=user_model_to_pb(host_request.initiator, session, recipient_context),
740 ),
741 moderation_state_id=host_request.moderation_state_id,
742 )
744 count_host_response(host_request.recipient_user_id, "cancelled")
745 log_event(
746 context,
747 session,
748 "host_request.cancelled",
749 {
750 "host_request_id": host_request.conversation_id,
751 "surfer_id": host_request.initiator_user_id,
752 "host_id": host_request.recipient_user_id,
753 "surfer_gender": host_request.initiator.gender,
754 "host_gender": host_request.recipient.gender,
755 "from_date": str(host_request.from_date),
756 "to_date": str(host_request.to_date),
757 "host_city": host_request.hosting_city,
758 },
759 )
761 session.add(control_message)
763 if request.text:
764 latest_message = Message(
765 conversation_id=host_request.conversation_id,
766 text=request.text,
767 author_id=context.user_id,
768 message_type=MessageType.text,
769 )
771 session.add(latest_message)
772 else:
773 latest_message = control_message
775 session.flush()
777 if host_request.initiator_user_id == context.user_id:
778 host_request.initiator_last_seen_message_id = latest_message.id
779 else:
780 host_request.recipient_last_seen_message_id = latest_message.id
781 session.commit()
783 return empty_pb2.Empty()
785 def GetHostRequestMessages(
786 self, request: requests_pb2.GetHostRequestMessagesReq, context: CouchersContext, session: Session
787 ) -> requests_pb2.GetHostRequestMessagesRes:
788 host_request = session.execute(
789 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
790 HostRequest.conversation_id == request.host_request_id
791 )
792 ).scalar_one_or_none()
794 if not host_request: 794 ↛ 795line 794 didn't jump to line 795 because the condition on line 794 was never true
795 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
797 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true
798 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
800 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
801 pagination = min(pagination, MAX_PAGE_SIZE)
803 messages = (
804 session.execute(
805 select(Message)
806 .where(Message.conversation_id == host_request.conversation_id)
807 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
808 .order_by(Message.id.desc())
809 .limit(pagination + 1)
810 )
811 .scalars()
812 .all()
813 )
815 no_more = len(messages) <= pagination
817 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0
819 return requests_pb2.GetHostRequestMessagesRes(
820 last_message_id=last_message_id,
821 no_more=no_more,
822 messages=[message_to_pb(message) for message in messages[:pagination]],
823 )
825 def SendHostRequestMessage(
826 self, request: requests_pb2.SendHostRequestMessageReq, context: CouchersContext, session: Session
827 ) -> empty_pb2.Empty:
828 if request.text == "":
829 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
830 host_request = session.execute(
831 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
832 HostRequest.conversation_id == request.host_request_id
833 )
834 ).scalar_one_or_none()
836 if not host_request:
837 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
839 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id:
840 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
842 if host_request.recipient_user_id == context.user_id:
843 _possibly_observe_first_response_time(session, host_request, context.user_id, "message")
845 message = Message(
846 conversation_id=host_request.conversation_id,
847 author_id=context.user_id,
848 message_type=MessageType.text,
849 text=request.text,
850 )
852 session.add(message)
853 session.flush()
855 if host_request.initiator_user_id == context.user_id:
856 host_request.initiator_last_seen_message_id = message.id
858 recipient_context = make_notification_user_context(user_id=host_request.recipient_user_id)
859 notify(
860 session,
861 user_id=host_request.recipient_user_id,
862 topic_action=NotificationTopicAction.host_request__message,
863 key=str(host_request.conversation_id),
864 data=notification_data_pb2.HostRequestMessage(
865 host_request=host_request_to_pb(host_request, session, recipient_context),
866 user=user_model_to_pb(host_request.initiator, session, recipient_context),
867 text=request.text,
868 am_host=True,
869 ),
870 moderation_state_id=host_request.moderation_state_id,
871 )
873 else:
874 host_request.recipient_last_seen_message_id = message.id
876 recipient_context = make_notification_user_context(user_id=host_request.initiator_user_id)
877 notify(
878 session,
879 user_id=host_request.initiator_user_id,
880 topic_action=NotificationTopicAction.host_request__message,
881 key=str(host_request.conversation_id),
882 data=notification_data_pb2.HostRequestMessage(
883 host_request=host_request_to_pb(host_request, session, recipient_context),
884 user=user_model_to_pb(host_request.recipient, session, recipient_context),
885 text=request.text,
886 am_host=False,
887 ),
888 moderation_state_id=host_request.moderation_state_id,
889 )
891 session.commit()
893 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
894 sent_messages_counter.labels(user_gender, "host request").inc()
895 log_event(
896 context,
897 session,
898 "host_request.message_sent",
899 {
900 "host_request_id": host_request.conversation_id,
901 "surfer_id": host_request.initiator_user_id,
902 "host_id": host_request.recipient_user_id,
903 "role": "host" if context.user_id == host_request.recipient_user_id else "surfer",
904 "host_city": host_request.hosting_city,
905 },
906 )
908 return empty_pb2.Empty()
910 def GetHostRequestUpdates(
911 self, request: requests_pb2.GetHostRequestUpdatesReq, context: CouchersContext, session: Session
912 ) -> requests_pb2.GetHostRequestUpdatesRes:
913 if request.only_sent and request.only_received: 913 ↛ 914line 913 didn't jump to line 914 because the condition on line 913 was never true
914 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
916 if request.newest_message_id == 0:
917 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
919 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 919 ↛ 920line 919 didn't jump to line 920 because the condition on line 919 was never true
920 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
922 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
923 pagination = min(pagination, MAX_PAGE_SIZE)
925 statement = where_moderated_content_visible(
926 select(
927 Message,
928 HostRequest.status.label("host_request_status"),
929 HostRequest.conversation_id.label("host_request_id"),
930 )
931 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
932 .where(Message.id > request.newest_message_id),
933 context,
934 HostRequest,
935 is_list_operation=False,
936 )
938 if request.only_sent: 938 ↛ 939line 938 didn't jump to line 939 because the condition on line 938 was never true
939 statement = statement.where(HostRequest.initiator_user_id == context.user_id)
940 elif request.only_received: 940 ↛ 941line 940 didn't jump to line 941 because the condition on line 940 was never true
941 statement = statement.where(HostRequest.recipient_user_id == context.user_id)
942 else:
943 statement = statement.where(
944 or_(HostRequest.recipient_user_id == context.user_id, HostRequest.initiator_user_id == context.user_id)
945 )
947 statement = statement.order_by(Message.id.asc()).limit(pagination + 1)
948 res = session.execute(statement).all()
950 no_more = len(res) <= pagination
952 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO
954 return requests_pb2.GetHostRequestUpdatesRes(
955 no_more=no_more,
956 updates=[
957 requests_pb2.HostRequestUpdate(
958 host_request_id=result.host_request_id,
959 status=hostrequeststatus2api[result.host_request_status],
960 message=message_to_pb(result.Message),
961 )
962 for result in res[:pagination]
963 ],
964 )
966 def MarkLastSeenHostRequest(
967 self, request: requests_pb2.MarkLastSeenHostRequestReq, context: CouchersContext, session: Session
968 ) -> empty_pb2.Empty:
969 host_request = session.execute(
970 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
971 HostRequest.conversation_id == request.host_request_id
972 )
973 ).scalar_one_or_none()
975 if not host_request: 975 ↛ 976line 975 didn't jump to line 976 because the condition on line 975 was never true
976 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
978 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id: 978 ↛ 979line 978 didn't jump to line 979 because the condition on line 978 was never true
979 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
981 if host_request.initiator_user_id == context.user_id: 981 ↛ 982line 981 didn't jump to line 982 because the condition on line 981 was never true
982 if not host_request.initiator_last_seen_message_id <= request.last_seen_message_id:
983 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
984 host_request.initiator_last_seen_message_id = request.last_seen_message_id
985 else:
986 if not host_request.recipient_last_seen_message_id <= request.last_seen_message_id:
987 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
988 host_request.recipient_last_seen_message_id = request.last_seen_message_id
990 mark_notifications_seen(
991 session,
992 user_id=context.user_id,
993 key=str(host_request.conversation_id),
994 topic_actions=[
995 NotificationTopicAction.host_request__create,
996 NotificationTopicAction.host_request__accept,
997 NotificationTopicAction.host_request__reject,
998 NotificationTopicAction.host_request__confirm,
999 NotificationTopicAction.host_request__cancel,
1000 NotificationTopicAction.host_request__message,
1001 NotificationTopicAction.host_request__missed_messages,
1002 NotificationTopicAction.host_request__reminder,
1003 ],
1004 )
1006 session.commit()
1007 return empty_pb2.Empty()
1009 def SetHostRequestArchiveStatus(
1010 self, request: requests_pb2.SetHostRequestArchiveStatusReq, context: CouchersContext, session: Session
1011 ) -> requests_pb2.SetHostRequestArchiveStatusRes:
1012 host_request = session.execute(
1013 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False)
1014 .where(HostRequest.conversation_id == request.host_request_id)
1015 .where(
1016 or_(HostRequest.initiator_user_id == context.user_id, HostRequest.recipient_user_id == context.user_id)
1017 )
1018 ).scalar_one_or_none()
1020 if not host_request: 1020 ↛ 1021line 1020 didn't jump to line 1021 because the condition on line 1020 was never true
1021 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
1023 if context.user_id == host_request.initiator_user_id: 1023 ↛ 1026line 1023 didn't jump to line 1026 because the condition on line 1023 was always true
1024 host_request.is_initiator_archived = request.is_archived
1025 else:
1026 host_request.is_recipient_archived = request.is_archived
1028 return requests_pb2.SetHostRequestArchiveStatusRes(
1029 host_request_id=host_request.conversation_id,
1030 is_archived=request.is_archived,
1031 )
1033 def GetResponseRate(
1034 self, request: requests_pb2.GetResponseRateReq, context: CouchersContext, session: Session
1035 ) -> requests_pb2.GetResponseRateRes:
1036 user_res = session.execute(
1037 select(User.id, UserResponseRate)
1038 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id)
1039 .where(users_visible(context, User))
1040 .where(User.id == request.user_id)
1041 ).one_or_none()
1043 # if user doesn't exist, return None
1044 if not user_res:
1045 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1047 user, response_rates = user_res
1048 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates)) # type: ignore[arg-type]
1050 def SendHostRequestFeedback(
1051 self, request: requests_pb2.SendHostRequestFeedbackReq, context: CouchersContext, session: Session
1052 ) -> empty_pb2.Empty:
1053 host_request = session.execute(
1054 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False)
1055 .where(HostRequest.conversation_id == request.host_request_id)
1056 .where(HostRequest.recipient_user_id == context.user_id)
1057 ).scalar_one_or_none()
1059 if not host_request:
1060 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
1062 feedback = session.execute(
1063 select(HostRequestFeedback)
1064 .where(HostRequestFeedback.host_request_id == host_request.conversation_id)
1065 .where(HostRequestFeedback.from_user_id == context.user_id)
1066 ).scalar_one_or_none()
1068 if feedback:
1069 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback")
1071 session.add(
1072 HostRequestFeedback(
1073 host_request_id=host_request.conversation_id,
1074 from_user_id=host_request.recipient_user_id,
1075 to_user_id=host_request.initiator_user_id,
1076 request_quality=hostrequestquality2sql.get(request.host_request_quality),
1077 decline_reason=request.decline_reason,
1078 )
1079 )
1080 quality = hostrequestquality2sql.get(request.host_request_quality)
1081 log_event(
1082 context,
1083 session,
1084 "host_request.feedback_submitted",
1085 {
1086 "host_request_id": host_request.conversation_id,
1087 "surfer_id": host_request.initiator_user_id,
1088 "host_id": host_request.recipient_user_id,
1089 "request_quality": quality.name if quality else None,
1090 "has_decline_reason": bool(request.decline_reason),
1091 "host_city": host_request.hosting_city,
1092 },
1093 )
1095 return empty_pb2.Empty()