Coverage for app / backend / src / couchers / servicers / requests.py: 92%
313 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy import exists, select
7from sqlalchemy.orm import Session, aliased
8from sqlalchemy.sql import and_, func, or_
10from couchers.constants import HOST_REQUEST_MIN_LENGTH_UTF16
11from couchers.context import CouchersContext
12from couchers.db import can_moderate_node
13from couchers.event_log import log_event
14from couchers.helpers.completed_profile import has_completed_profile
15from couchers.materialized_views import UserResponseRate
16from couchers.metrics import (
17 account_age_on_host_request_create_histogram,
18 host_request_first_response_histogram,
19 host_request_responses_counter,
20 host_requests_sent_counter,
21 sent_messages_counter,
22)
23from couchers.models import (
24 Conversation,
25 HostRequest,
26 HostRequestFeedback,
27 HostRequestQuality,
28 HostRequestStatus,
29 Message,
30 MessageType,
31 ModerationObjectType,
32 RateLimitAction,
33 User,
34)
35from couchers.models.notifications import NotificationTopicAction
36from couchers.models.public_trips import PublicTrip, PublicTripStatus
37from couchers.moderation.utils import create_moderation
38from couchers.notifications.notify import notify
39from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc
40from couchers.rate_limits.check import process_rate_limits_and_check_abort
41from couchers.rate_limits.definitions import RATE_LIMIT_HOURS
42from couchers.servicers.api import response_rate_to_pb, user_model_to_pb
43from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible
44from couchers.utils import (
45 Timestamp_from_datetime,
46 date_to_api,
47 get_coordinates,
48 now,
49 parse_date,
50 today_in_timezone,
51)
53logger = logging.getLogger(__name__)
55DEFAULT_PAGINATION_LENGTH = 10
56MAX_PAGE_SIZE = 50
59hostrequeststatus2api = {
60 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING,
61 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED,
62 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED,
63 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED,
64 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED,
65}
67hostrequestquality2sql = {
68 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality,
69 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality,
70 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality,
71}
74def message_to_pb(message: Message) -> conversations_pb2.Message:
75 """
76 Turns the given message to a protocol buffer
77 """
78 if message.is_normal_message:
79 return conversations_pb2.Message(
80 message_id=message.id,
81 author_user_id=message.author_id,
82 time=Timestamp_from_datetime(message.time),
83 text=conversations_pb2.MessageContentText(text=message.text),
84 )
85 else:
86 return conversations_pb2.Message(
87 message_id=message.id,
88 author_user_id=message.author_id,
89 time=Timestamp_from_datetime(message.time),
90 chat_created=(
91 conversations_pb2.MessageContentChatCreated()
92 if message.message_type == MessageType.chat_created
93 else None
94 ),
95 host_request_status_changed=(
96 conversations_pb2.MessageContentHostRequestStatusChanged(
97 status=hostrequeststatus2api[message.host_request_status_target] # type: ignore[index]
98 )
99 if message.message_type == MessageType.host_request_status_changed
100 else None
101 ),
102 )
105def host_request_to_pb(
106 host_request: HostRequest, session: Session, context: CouchersContext
107) -> requests_pb2.HostRequest:
108 initial_message = session.execute(
109 select(Message)
110 .where(Message.conversation_id == host_request.conversation_id)
111 .order_by(Message.id.asc())
112 .limit(1)
113 ).scalar_one()
115 latest_message = session.execute(
116 select(Message)
117 .where(Message.conversation_id == host_request.conversation_id)
118 .order_by(Message.id.desc())
119 .limit(1)
120 ).scalar_one()
122 lat, lng = get_coordinates(host_request.hosting_location)
124 need_feedback = False
125 if context.user_id == host_request.recipient_user_id and host_request.status == HostRequestStatus.rejected:
126 need_feedback = not session.execute(
127 select(
128 exists().where(
129 HostRequestFeedback.from_user_id == context.user_id,
130 HostRequestFeedback.host_request_id == host_request.conversation_id,
131 )
132 )
133 ).scalar_one()
135 return requests_pb2.HostRequest(
136 host_request_id=host_request.conversation_id,
137 surfer_user_id=host_request.initiator_user_id,
138 host_user_id=host_request.recipient_user_id,
139 status=hostrequeststatus2api[host_request.status],
140 created=Timestamp_from_datetime(initial_message.time),
141 from_date=date_to_api(host_request.from_date),
142 to_date=date_to_api(host_request.to_date),
143 last_seen_message_id=(
144 host_request.initiator_last_seen_message_id
145 if context.user_id == host_request.initiator_user_id
146 else host_request.recipient_last_seen_message_id
147 ),
148 latest_message=message_to_pb(latest_message),
149 hosting_city=host_request.hosting_city,
150 hosting_lat=lat,
151 hosting_lng=lng,
152 hosting_radius=host_request.hosting_radius,
153 need_host_request_feedback=need_feedback,
154 is_archived=(
155 host_request.is_recipient_archived
156 if context.user_id == host_request.recipient_user_id
157 else host_request.is_initiator_archived
158 ),
159 public_trip_id=host_request.public_trip_id,
160 )
163def _possibly_observe_first_response_time(
164 session: Session, host_request: HostRequest, user_id: int, response_type: str
165) -> None:
166 # if this is the first response then there's nothing by this user yet
167 assert host_request.recipient_user_id == user_id
169 number_messages_by_host = session.execute(
170 select(func.count())
171 .where(Message.conversation_id == host_request.conversation_id)
172 .where(Message.author_id == user_id)
173 ).scalar_one_or_none()
175 if number_messages_by_host == 0:
176 host_gender = session.execute(select(User.gender).where(User.id == host_request.recipient_user_id)).scalar_one()
177 surfer_gender = session.execute(
178 select(User.gender).where(User.id == host_request.initiator_user_id)
179 ).scalar_one()
180 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe(
181 (now() - host_request.conversation.created).total_seconds()
182 )
185def _is_host_request_long_enough(text: str) -> bool:
186 # Python's len(str) does not match Javascript's string.length.
187 # e.g. len("é") == 2 but "é".length == 1.
188 # To match the frontend's validation, measure the string in utf16 code units.
189 text_length_utf16 = len(text.encode("utf-16-le")) // 2 # utf-16-le does not include a prefix BOM code unit.
190 return text_length_utf16 >= HOST_REQUEST_MIN_LENGTH_UTF16
193class Requests(requests_pb2_grpc.RequestsServicer):
194 def CreateHostRequest(
195 self, request: requests_pb2.CreateHostRequestReq, context: CouchersContext, session: Session
196 ) -> requests_pb2.CreateHostRequestRes:
197 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
198 if not has_completed_profile(session, user):
199 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request")
201 if request.host_user_id == context.user_id:
202 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self")
204 # just to check recipient exists and is visible
205 recipient = session.execute(
206 select(User).where(users_visible(context, User)).where(User.id == request.host_user_id)
207 ).scalar_one_or_none()
208 if not recipient:
209 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
211 from_date = parse_date(request.from_date)
212 to_date = parse_date(request.to_date)
214 if not from_date or not to_date:
215 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date")
217 today = today_in_timezone(recipient.timezone)
219 # request starts from the past
220 if from_date < today:
221 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today")
223 # from_date is not >= to_date
224 if from_date >= to_date:
225 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to")
227 # No need to check today > to_date
229 if from_date - today > timedelta(days=365):
230 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year")
232 if to_date - from_date > timedelta(days=365):
233 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year")
235 # Check minimum length
236 if not _is_host_request_long_enough(request.text):
237 context.abort_with_error_code(
238 grpc.StatusCode.INVALID_ARGUMENT,
239 "host_request_too_short2",
240 substitutions={"count": HOST_REQUEST_MIN_LENGTH_UTF16},
241 )
243 # Check if user has been sending host requests excessively
244 if process_rate_limits_and_check_abort(
245 session=session, user_id=context.user_id, action=RateLimitAction.host_request
246 ):
247 context.abort_with_error_code(
248 grpc.StatusCode.RESOURCE_EXHAUSTED,
249 "host_request_rate_limit2",
250 substitutions={"count": RATE_LIMIT_HOURS},
251 )
253 # If this is an offer in response to a public trip, validate it
254 public_trip_id = request.public_trip_id if request.HasField("public_trip_id") else None
255 if public_trip_id is not None:
256 public_trip = session.execute(
257 select(PublicTrip).where(PublicTrip.id == public_trip_id)
258 ).scalar_one_or_none()
259 if not public_trip:
260 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "public_trip_not_found")
261 # The trip's traveler must be the recipient of this host request (role reversal)
262 if public_trip.user_id != recipient.id:
263 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "public_trip_user_mismatch")
264 # Trip must still be active
265 if public_trip.status != PublicTripStatus.searching_for_host:
266 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "public_trip_not_active")
267 # Offered dates must fall within the trip's window (host can shorten, not extend)
268 if from_date < public_trip.from_date or to_date > public_trip.to_date:
269 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "public_trip_dates_out_of_range")
270 # Enforce same_gender_only restriction (community moderators bypass)
271 if (
272 public_trip.same_gender_only
273 and not can_moderate_node(session, context.user_id, public_trip.node_id)
274 and user.gender != recipient.gender
275 ):
276 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "public_trip_same_gender_only")
277 # Prevent duplicate offers on the same trip
278 existing_offer = session.execute(
279 select(HostRequest)
280 .where(HostRequest.public_trip_id == public_trip_id)
281 .where(HostRequest.initiator_user_id == context.user_id)
282 ).scalar_one_or_none()
283 if existing_offer:
284 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "duplicate_host_request_for_trip")
286 conversation = Conversation()
287 session.add(conversation)
288 session.flush()
290 session.add(
291 Message(
292 conversation_id=conversation.id,
293 author_id=context.user_id,
294 message_type=MessageType.chat_created,
295 )
296 )
298 message = Message(
299 conversation_id=conversation.id,
300 author_id=context.user_id,
301 text=request.text,
302 message_type=MessageType.text,
303 )
304 session.add(message)
305 session.flush()
307 # Create moderation state for UMS (starts as SHADOWED)
308 moderation_state = create_moderation(
309 session=session,
310 object_type=ModerationObjectType.host_request,
311 object_id=conversation.id,
312 creator_user_id=context.user_id,
313 )
315 host_request = HostRequest(
316 conversation_id=conversation.id,
317 initiator_user_id=context.user_id,
318 recipient_user_id=recipient.id,
319 moderation_state_id=moderation_state.id,
320 from_date=from_date,
321 to_date=to_date,
322 status=HostRequestStatus.pending,
323 initiator_last_seen_message_id=message.id,
324 # TODO: tz
325 # timezone=recipient.timezone,
326 hosting_city=recipient.city,
327 hosting_location=recipient.geom,
328 hosting_radius=recipient.geom_radius,
329 public_trip_id=public_trip_id,
330 )
331 session.add(host_request)
332 session.flush()
334 notify(
335 session,
336 user_id=host_request.recipient_user_id,
337 topic_action=NotificationTopicAction.host_request__create,
338 key=str(host_request.conversation_id),
339 data=notification_data_pb2.HostRequestCreate(
340 host_request=host_request_to_pb(host_request, session, context),
341 surfer=user_model_to_pb(host_request.initiator, session, context),
342 text=request.text,
343 ),
344 moderation_state_id=moderation_state.id,
345 )
347 host_requests_sent_counter.labels(user.gender, recipient.gender).inc()
348 sent_messages_counter.labels(user.gender, "host request send").inc()
349 account_age_on_host_request_create_histogram.labels(user.gender, recipient.gender).observe(
350 (now() - user.joined).total_seconds()
351 )
352 log_event(
353 context,
354 session,
355 "host_request.created",
356 {
357 "host_request_id": host_request.conversation_id,
358 "host_id": recipient.id,
359 "surfer_gender": user.gender,
360 "host_gender": recipient.gender,
361 "city": recipient.city,
362 "from_date": str(from_date),
363 "to_date": str(to_date),
364 "nights": (to_date - from_date).days,
365 },
366 )
368 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id)
370 def GetHostRequest(
371 self, request: requests_pb2.GetHostRequestReq, context: CouchersContext, session: Session
372 ) -> requests_pb2.HostRequest:
373 host_request = session.execute(
374 where_moderated_content_visible(
375 where_users_column_visible(
376 where_users_column_visible(
377 select(HostRequest),
378 context,
379 HostRequest.initiator_user_id,
380 ),
381 context,
382 HostRequest.recipient_user_id,
383 ),
384 context,
385 HostRequest,
386 is_list_operation=False,
387 )
388 .where(HostRequest.conversation_id == request.host_request_id)
389 .where(
390 or_(HostRequest.initiator_user_id == context.user_id, HostRequest.recipient_user_id == context.user_id)
391 )
392 ).scalar_one_or_none()
394 if not host_request:
395 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
397 return host_request_to_pb(host_request, session, context)
399 def ListHostRequests(
400 self, request: requests_pb2.ListHostRequestsReq, context: CouchersContext, session: Session
401 ) -> requests_pb2.ListHostRequestsRes:
402 if request.only_sent and request.only_received: 402 ↛ 403line 402 didn't jump to line 403 because the condition on line 402 was never true
403 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
405 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
406 pagination = min(pagination, MAX_PAGE_SIZE)
408 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have
409 # none as message_2.id. So just filter for these to get the highest messages only.
410 # See https://stackoverflow.com/a/27802817/6115336
411 message_2 = aliased(Message)
412 statement = where_moderated_content_visible(
413 where_users_column_visible(
414 where_users_column_visible(
415 select(Message, HostRequest, Conversation)
416 .outerjoin(
417 message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id)
418 )
419 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
420 .join(Conversation, Conversation.id == HostRequest.conversation_id),
421 context,
422 HostRequest.initiator_user_id,
423 ),
424 context,
425 HostRequest.recipient_user_id,
426 ),
427 context,
428 HostRequest,
429 is_list_operation=True,
430 ).where(message_2.id == None)
432 if request.last_request_id != 0:
433 statement = statement.where(Message.id < request.last_request_id)
434 if request.only_sent:
435 statement = statement.where(HostRequest.initiator_user_id == context.user_id)
436 elif request.only_received:
437 statement = statement.where(HostRequest.recipient_user_id == context.user_id)
438 elif request.HasField("only_archived"):
439 statement = statement.where(
440 or_(
441 and_(
442 HostRequest.initiator_user_id == context.user_id,
443 HostRequest.is_initiator_archived == request.only_archived,
444 ),
445 and_(
446 HostRequest.recipient_user_id == context.user_id,
447 HostRequest.is_recipient_archived == request.only_archived,
448 ),
449 )
450 )
451 else:
452 statement = statement.where(
453 or_(HostRequest.recipient_user_id == context.user_id, HostRequest.initiator_user_id == context.user_id)
454 )
456 # TODO: I considered having the latest control message be the single source of truth for
457 # the HostRequest.status, but decided against it because of this filter.
458 # Another possibility is to filter in the python instead of SQL, but that's slower
459 if request.only_active:
460 statement = statement.where(
461 or_(
462 HostRequest.status == HostRequestStatus.pending,
463 HostRequest.status == HostRequestStatus.accepted,
464 HostRequest.status == HostRequestStatus.confirmed,
465 )
466 )
467 statement = statement.where(HostRequest.end_time <= func.now())
469 statement = statement.order_by(Message.id.desc()).limit(pagination + 1)
470 results = session.execute(statement).all()
472 host_requests = []
473 for result in results[:pagination]:
474 lat, lng = get_coordinates(result.HostRequest.hosting_location)
475 host_requests.append(
476 requests_pb2.HostRequest(
477 host_request_id=result.HostRequest.conversation_id,
478 surfer_user_id=result.HostRequest.initiator_user_id,
479 host_user_id=result.HostRequest.recipient_user_id,
480 status=hostrequeststatus2api[result.HostRequest.status],
481 created=Timestamp_from_datetime(result.Conversation.created),
482 from_date=date_to_api(result.HostRequest.from_date),
483 to_date=date_to_api(result.HostRequest.to_date),
484 last_seen_message_id=(
485 result.HostRequest.initiator_last_seen_message_id
486 if context.user_id == result.HostRequest.initiator_user_id
487 else result.HostRequest.recipient_last_seen_message_id
488 ),
489 latest_message=message_to_pb(result.Message),
490 hosting_city=result.HostRequest.hosting_city,
491 hosting_lat=lat,
492 hosting_lng=lng,
493 hosting_radius=result.HostRequest.hosting_radius,
494 )
495 )
497 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO
498 no_more = len(results) <= pagination
500 return requests_pb2.ListHostRequestsRes(
501 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests
502 )
504 def RespondHostRequest(
505 self, request: requests_pb2.RespondHostRequestReq, context: CouchersContext, session: Session
506 ) -> empty_pb2.Empty:
507 def count_host_response(other_user_id: int, response_type: str) -> None:
508 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
509 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one()
510 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc()
511 sent_messages_counter.labels(user_gender, "host request response").inc()
513 host_request = session.execute(
514 where_moderated_content_visible(
515 where_users_column_visible(
516 where_users_column_visible(
517 select(HostRequest),
518 context,
519 HostRequest.initiator_user_id,
520 ),
521 context,
522 HostRequest.recipient_user_id,
523 ),
524 context,
525 HostRequest,
526 is_list_operation=False,
527 ).where(HostRequest.conversation_id == request.host_request_id)
528 ).scalar_one_or_none()
530 if not host_request:
531 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
533 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id:
534 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
536 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING:
537 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
539 if host_request.end_time < now(): 539 ↛ 540line 539 didn't jump to line 540 because the condition on line 539 was never true
540 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past")
542 control_message = Message(
543 message_type=MessageType.host_request_status_changed,
544 conversation_id=host_request.conversation_id,
545 author_id=context.user_id,
546 )
548 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED:
549 # only host can accept
550 if context.user_id != host_request.recipient_user_id:
551 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host")
552 # can't accept a cancelled or confirmed request (only reject), or already accepted
553 if ( 553 ↛ 558line 553 didn't jump to line 558 because the condition on line 553 was never true
554 host_request.status == HostRequestStatus.cancelled
555 or host_request.status == HostRequestStatus.confirmed
556 or host_request.status == HostRequestStatus.accepted
557 ):
558 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
559 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted")
560 control_message.host_request_status_target = HostRequestStatus.accepted
561 host_request.status = HostRequestStatus.accepted
562 session.flush()
564 notify(
565 session,
566 user_id=host_request.initiator_user_id,
567 topic_action=NotificationTopicAction.host_request__accept,
568 key=str(host_request.conversation_id),
569 data=notification_data_pb2.HostRequestAccept(
570 host_request=host_request_to_pb(host_request, session, context),
571 host=user_model_to_pb(host_request.recipient, session, context),
572 ),
573 moderation_state_id=host_request.moderation_state_id,
574 )
576 count_host_response(host_request.initiator_user_id, "accepted")
577 log_event(
578 context,
579 session,
580 "host_request.accepted",
581 {
582 "host_request_id": host_request.conversation_id,
583 "surfer_id": host_request.initiator_user_id,
584 "host_id": host_request.recipient_user_id,
585 "surfer_gender": host_request.initiator.gender,
586 "host_gender": host_request.recipient.gender,
587 "from_date": str(host_request.from_date),
588 "to_date": str(host_request.to_date),
589 "host_city": host_request.hosting_city,
590 },
591 )
593 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED:
594 # only host can reject
595 if context.user_id != host_request.recipient_user_id: 595 ↛ 596line 595 didn't jump to line 596 because the condition on line 595 was never true
596 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
597 # can't reject a cancelled or already rejected request
598 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 598 ↛ 599line 598 didn't jump to line 599 because the condition on line 598 was never true
599 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
600 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected")
601 control_message.host_request_status_target = HostRequestStatus.rejected
602 host_request.status = HostRequestStatus.rejected
603 session.flush()
605 notify(
606 session,
607 user_id=host_request.initiator_user_id,
608 topic_action=NotificationTopicAction.host_request__reject,
609 key=str(host_request.conversation_id),
610 data=notification_data_pb2.HostRequestReject(
611 host_request=host_request_to_pb(host_request, session, context),
612 host=user_model_to_pb(host_request.recipient, session, context),
613 ),
614 moderation_state_id=host_request.moderation_state_id,
615 )
617 count_host_response(host_request.initiator_user_id, "rejected")
619 log_event(
620 context,
621 session,
622 "host_request.rejected",
623 {
624 "host_request_id": host_request.conversation_id,
625 "surfer_id": host_request.initiator_user_id,
626 "host_id": host_request.recipient_user_id,
627 "surfer_gender": host_request.initiator.gender,
628 "host_gender": host_request.recipient.gender,
629 "from_date": str(host_request.from_date),
630 "to_date": str(host_request.to_date),
631 "host_city": host_request.hosting_city,
632 },
633 )
635 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED:
636 # only surfer can confirm
637 if context.user_id != host_request.initiator_user_id:
638 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
639 # can only confirm an accepted request
640 if host_request.status != HostRequestStatus.accepted:
641 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
642 control_message.host_request_status_target = HostRequestStatus.confirmed
643 host_request.status = HostRequestStatus.confirmed
644 session.flush()
646 notify(
647 session,
648 user_id=host_request.recipient_user_id,
649 topic_action=NotificationTopicAction.host_request__confirm,
650 key=str(host_request.conversation_id),
651 data=notification_data_pb2.HostRequestConfirm(
652 host_request=host_request_to_pb(host_request, session, context),
653 surfer=user_model_to_pb(host_request.initiator, session, context),
654 ),
655 moderation_state_id=host_request.moderation_state_id,
656 )
658 count_host_response(host_request.recipient_user_id, "confirmed")
659 log_event(
660 context,
661 session,
662 "host_request.confirmed",
663 {
664 "host_request_id": host_request.conversation_id,
665 "surfer_id": host_request.initiator_user_id,
666 "host_id": host_request.recipient_user_id,
667 "surfer_gender": host_request.initiator.gender,
668 "host_gender": host_request.recipient.gender,
669 "from_date": str(host_request.from_date),
670 "to_date": str(host_request.to_date),
671 "host_city": host_request.hosting_city,
672 },
673 )
675 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED:
676 # only surfer can cancel
677 if context.user_id != host_request.initiator_user_id:
678 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
679 # can't' cancel an already cancelled or rejected request
680 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 680 ↛ 681line 680 didn't jump to line 681 because the condition on line 680 was never true
681 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
682 control_message.host_request_status_target = HostRequestStatus.cancelled
683 host_request.status = HostRequestStatus.cancelled
684 session.flush()
686 notify(
687 session,
688 user_id=host_request.recipient_user_id,
689 topic_action=NotificationTopicAction.host_request__cancel,
690 key=str(host_request.conversation_id),
691 data=notification_data_pb2.HostRequestCancel(
692 host_request=host_request_to_pb(host_request, session, context),
693 surfer=user_model_to_pb(host_request.initiator, session, context),
694 ),
695 moderation_state_id=host_request.moderation_state_id,
696 )
698 count_host_response(host_request.recipient_user_id, "cancelled")
699 log_event(
700 context,
701 session,
702 "host_request.cancelled",
703 {
704 "host_request_id": host_request.conversation_id,
705 "surfer_id": host_request.initiator_user_id,
706 "host_id": host_request.recipient_user_id,
707 "surfer_gender": host_request.initiator.gender,
708 "host_gender": host_request.recipient.gender,
709 "from_date": str(host_request.from_date),
710 "to_date": str(host_request.to_date),
711 "host_city": host_request.hosting_city,
712 },
713 )
715 session.add(control_message)
717 if request.text:
718 latest_message = Message(
719 conversation_id=host_request.conversation_id,
720 text=request.text,
721 author_id=context.user_id,
722 message_type=MessageType.text,
723 )
725 session.add(latest_message)
726 else:
727 latest_message = control_message
729 session.flush()
731 if host_request.initiator_user_id == context.user_id:
732 host_request.initiator_last_seen_message_id = latest_message.id
733 else:
734 host_request.recipient_last_seen_message_id = latest_message.id
735 session.commit()
737 return empty_pb2.Empty()
739 def GetHostRequestMessages(
740 self, request: requests_pb2.GetHostRequestMessagesReq, context: CouchersContext, session: Session
741 ) -> requests_pb2.GetHostRequestMessagesRes:
742 host_request = session.execute(
743 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
744 HostRequest.conversation_id == request.host_request_id
745 )
746 ).scalar_one_or_none()
748 if not host_request: 748 ↛ 749line 748 didn't jump to line 749 because the condition on line 748 was never true
749 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
751 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id: 751 ↛ 752line 751 didn't jump to line 752 because the condition on line 751 was never true
752 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
754 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
755 pagination = min(pagination, MAX_PAGE_SIZE)
757 messages = (
758 session.execute(
759 select(Message)
760 .where(Message.conversation_id == host_request.conversation_id)
761 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
762 .order_by(Message.id.desc())
763 .limit(pagination + 1)
764 )
765 .scalars()
766 .all()
767 )
769 no_more = len(messages) <= pagination
771 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0
773 return requests_pb2.GetHostRequestMessagesRes(
774 last_message_id=last_message_id,
775 no_more=no_more,
776 messages=[message_to_pb(message) for message in messages[:pagination]],
777 )
779 def SendHostRequestMessage(
780 self, request: requests_pb2.SendHostRequestMessageReq, context: CouchersContext, session: Session
781 ) -> empty_pb2.Empty:
782 if request.text == "":
783 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
784 host_request = session.execute(
785 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
786 HostRequest.conversation_id == request.host_request_id
787 )
788 ).scalar_one_or_none()
790 if not host_request:
791 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
793 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id:
794 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
796 if host_request.recipient_user_id == context.user_id:
797 _possibly_observe_first_response_time(session, host_request, context.user_id, "message")
799 message = Message(
800 conversation_id=host_request.conversation_id,
801 author_id=context.user_id,
802 message_type=MessageType.text,
803 text=request.text,
804 )
806 session.add(message)
807 session.flush()
809 if host_request.initiator_user_id == context.user_id:
810 host_request.initiator_last_seen_message_id = message.id
812 notify(
813 session,
814 user_id=host_request.recipient_user_id,
815 topic_action=NotificationTopicAction.host_request__message,
816 key=str(host_request.conversation_id),
817 data=notification_data_pb2.HostRequestMessage(
818 host_request=host_request_to_pb(host_request, session, context),
819 user=user_model_to_pb(host_request.initiator, session, context),
820 text=request.text,
821 am_host=True,
822 ),
823 moderation_state_id=host_request.moderation_state_id,
824 )
826 else:
827 host_request.recipient_last_seen_message_id = message.id
829 notify(
830 session,
831 user_id=host_request.initiator_user_id,
832 topic_action=NotificationTopicAction.host_request__message,
833 key=str(host_request.conversation_id),
834 data=notification_data_pb2.HostRequestMessage(
835 host_request=host_request_to_pb(host_request, session, context),
836 user=user_model_to_pb(host_request.recipient, session, context),
837 text=request.text,
838 am_host=False,
839 ),
840 moderation_state_id=host_request.moderation_state_id,
841 )
843 session.commit()
845 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
846 sent_messages_counter.labels(user_gender, "host request").inc()
847 log_event(
848 context,
849 session,
850 "host_request.message_sent",
851 {
852 "host_request_id": host_request.conversation_id,
853 "surfer_id": host_request.initiator_user_id,
854 "host_id": host_request.recipient_user_id,
855 "role": "host" if context.user_id == host_request.recipient_user_id else "surfer",
856 "host_city": host_request.hosting_city,
857 },
858 )
860 return empty_pb2.Empty()
862 def GetHostRequestUpdates(
863 self, request: requests_pb2.GetHostRequestUpdatesReq, context: CouchersContext, session: Session
864 ) -> requests_pb2.GetHostRequestUpdatesRes:
865 if request.only_sent and request.only_received: 865 ↛ 866line 865 didn't jump to line 866 because the condition on line 865 was never true
866 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
868 if request.newest_message_id == 0:
869 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
871 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 871 ↛ 872line 871 didn't jump to line 872 because the condition on line 871 was never true
872 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
874 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
875 pagination = min(pagination, MAX_PAGE_SIZE)
877 statement = where_moderated_content_visible(
878 select(
879 Message,
880 HostRequest.status.label("host_request_status"),
881 HostRequest.conversation_id.label("host_request_id"),
882 )
883 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
884 .where(Message.id > request.newest_message_id),
885 context,
886 HostRequest,
887 is_list_operation=False,
888 )
890 if request.only_sent: 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true
891 statement = statement.where(HostRequest.initiator_user_id == context.user_id)
892 elif request.only_received: 892 ↛ 893line 892 didn't jump to line 893 because the condition on line 892 was never true
893 statement = statement.where(HostRequest.recipient_user_id == context.user_id)
894 else:
895 statement = statement.where(
896 or_(HostRequest.recipient_user_id == context.user_id, HostRequest.initiator_user_id == context.user_id)
897 )
899 statement = statement.order_by(Message.id.asc()).limit(pagination + 1)
900 res = session.execute(statement).all()
902 no_more = len(res) <= pagination
904 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO
906 return requests_pb2.GetHostRequestUpdatesRes(
907 no_more=no_more,
908 updates=[
909 requests_pb2.HostRequestUpdate(
910 host_request_id=result.host_request_id,
911 status=hostrequeststatus2api[result.host_request_status],
912 message=message_to_pb(result.Message),
913 )
914 for result in res[:pagination]
915 ],
916 )
918 def MarkLastSeenHostRequest(
919 self, request: requests_pb2.MarkLastSeenHostRequestReq, context: CouchersContext, session: Session
920 ) -> empty_pb2.Empty:
921 host_request = session.execute(
922 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
923 HostRequest.conversation_id == request.host_request_id
924 )
925 ).scalar_one_or_none()
927 if not host_request: 927 ↛ 928line 927 didn't jump to line 928 because the condition on line 927 was never true
928 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
930 if host_request.initiator_user_id != context.user_id and host_request.recipient_user_id != context.user_id: 930 ↛ 931line 930 didn't jump to line 931 because the condition on line 930 was never true
931 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
933 if host_request.initiator_user_id == context.user_id: 933 ↛ 934line 933 didn't jump to line 934 because the condition on line 933 was never true
934 if not host_request.initiator_last_seen_message_id <= request.last_seen_message_id:
935 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
936 host_request.initiator_last_seen_message_id = request.last_seen_message_id
937 else:
938 if not host_request.recipient_last_seen_message_id <= request.last_seen_message_id:
939 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
940 host_request.recipient_last_seen_message_id = request.last_seen_message_id
942 session.commit()
943 return empty_pb2.Empty()
945 def SetHostRequestArchiveStatus(
946 self, request: requests_pb2.SetHostRequestArchiveStatusReq, context: CouchersContext, session: Session
947 ) -> requests_pb2.SetHostRequestArchiveStatusRes:
948 host_request = session.execute(
949 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False)
950 .where(HostRequest.conversation_id == request.host_request_id)
951 .where(
952 or_(HostRequest.initiator_user_id == context.user_id, HostRequest.recipient_user_id == context.user_id)
953 )
954 ).scalar_one_or_none()
956 if not host_request: 956 ↛ 957line 956 didn't jump to line 957 because the condition on line 956 was never true
957 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
959 if context.user_id == host_request.initiator_user_id: 959 ↛ 962line 959 didn't jump to line 962 because the condition on line 959 was always true
960 host_request.is_initiator_archived = request.is_archived
961 else:
962 host_request.is_recipient_archived = request.is_archived
964 return requests_pb2.SetHostRequestArchiveStatusRes(
965 host_request_id=host_request.conversation_id,
966 is_archived=request.is_archived,
967 )
969 def GetResponseRate(
970 self, request: requests_pb2.GetResponseRateReq, context: CouchersContext, session: Session
971 ) -> requests_pb2.GetResponseRateRes:
972 user_res = session.execute(
973 select(User.id, UserResponseRate)
974 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id)
975 .where(users_visible(context, User))
976 .where(User.id == request.user_id)
977 ).one_or_none()
979 # if user doesn't exist, return None
980 if not user_res:
981 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
983 user, response_rates = user_res
984 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates)) # type: ignore[arg-type]
986 def SendHostRequestFeedback(
987 self, request: requests_pb2.SendHostRequestFeedbackReq, context: CouchersContext, session: Session
988 ) -> empty_pb2.Empty:
989 host_request = session.execute(
990 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False)
991 .where(HostRequest.conversation_id == request.host_request_id)
992 .where(HostRequest.recipient_user_id == context.user_id)
993 ).scalar_one_or_none()
995 if not host_request:
996 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
998 feedback = session.execute(
999 select(HostRequestFeedback)
1000 .where(HostRequestFeedback.host_request_id == host_request.conversation_id)
1001 .where(HostRequestFeedback.from_user_id == context.user_id)
1002 ).scalar_one_or_none()
1004 if feedback:
1005 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback")
1007 session.add(
1008 HostRequestFeedback(
1009 host_request_id=host_request.conversation_id,
1010 from_user_id=host_request.recipient_user_id,
1011 to_user_id=host_request.initiator_user_id,
1012 request_quality=hostrequestquality2sql.get(request.host_request_quality),
1013 decline_reason=request.decline_reason,
1014 )
1015 )
1016 quality = hostrequestquality2sql.get(request.host_request_quality)
1017 log_event(
1018 context,
1019 session,
1020 "host_request.feedback_submitted",
1021 {
1022 "host_request_id": host_request.conversation_id,
1023 "surfer_id": host_request.initiator_user_id,
1024 "host_id": host_request.recipient_user_id,
1025 "request_quality": quality.name if quality else None,
1026 "has_decline_reason": bool(request.decline_reason),
1027 "host_city": host_request.hosting_city,
1028 },
1029 )
1031 return empty_pb2.Empty()