Coverage for app / backend / src / couchers / servicers / requests.py: 91%
295 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy import exists, select
7from sqlalchemy.orm import Session, aliased
8from sqlalchemy.sql import and_, func, or_
10from couchers.constants import HOST_REQUEST_MIN_LENGTH_UTF16
11from couchers.context import CouchersContext
12from couchers.event_log import log_event
13from couchers.helpers.completed_profile import has_completed_profile
14from couchers.materialized_views import UserResponseRate
15from couchers.metrics import (
16 account_age_on_host_request_create_histogram,
17 host_request_first_response_histogram,
18 host_request_responses_counter,
19 host_requests_sent_counter,
20 sent_messages_counter,
21)
22from couchers.models import (
23 Conversation,
24 HostRequest,
25 HostRequestFeedback,
26 HostRequestQuality,
27 HostRequestStatus,
28 Message,
29 MessageType,
30 ModerationObjectType,
31 RateLimitAction,
32 User,
33)
34from couchers.models.notifications import NotificationTopicAction
35from couchers.moderation.utils import create_moderation
36from couchers.notifications.notify import notify
37from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc
38from couchers.rate_limits.check import process_rate_limits_and_check_abort
39from couchers.rate_limits.definitions import RATE_LIMIT_HOURS
40from couchers.servicers.api import response_rate_to_pb, user_model_to_pb
41from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible
42from couchers.utils import (
43 Timestamp_from_datetime,
44 date_to_api,
45 get_coordinates,
46 now,
47 parse_date,
48 today_in_timezone,
49)
51logger = logging.getLogger(__name__)
53DEFAULT_PAGINATION_LENGTH = 10
54MAX_PAGE_SIZE = 50
57hostrequeststatus2api = {
58 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING,
59 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED,
60 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED,
61 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED,
62 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED,
63}
65hostrequestquality2sql = {
66 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality,
67 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality,
68 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality,
69}
72def message_to_pb(message: Message) -> conversations_pb2.Message:
73 """
74 Turns the given message to a protocol buffer
75 """
76 if message.is_normal_message:
77 return conversations_pb2.Message(
78 message_id=message.id,
79 author_user_id=message.author_id,
80 time=Timestamp_from_datetime(message.time),
81 text=conversations_pb2.MessageContentText(text=message.text),
82 )
83 else:
84 return conversations_pb2.Message(
85 message_id=message.id,
86 author_user_id=message.author_id,
87 time=Timestamp_from_datetime(message.time),
88 chat_created=(
89 conversations_pb2.MessageContentChatCreated()
90 if message.message_type == MessageType.chat_created
91 else None
92 ),
93 host_request_status_changed=(
94 conversations_pb2.MessageContentHostRequestStatusChanged(
95 status=hostrequeststatus2api[message.host_request_status_target] # type: ignore[index]
96 )
97 if message.message_type == MessageType.host_request_status_changed
98 else None
99 ),
100 )
103def host_request_to_pb(
104 host_request: HostRequest, session: Session, context: CouchersContext
105) -> requests_pb2.HostRequest:
106 initial_message = session.execute(
107 select(Message)
108 .where(Message.conversation_id == host_request.conversation_id)
109 .order_by(Message.id.asc())
110 .limit(1)
111 ).scalar_one()
113 latest_message = session.execute(
114 select(Message)
115 .where(Message.conversation_id == host_request.conversation_id)
116 .order_by(Message.id.desc())
117 .limit(1)
118 ).scalar_one()
120 lat, lng = get_coordinates(host_request.hosting_location)
122 need_feedback = False
123 if context.user_id == host_request.host_user_id and host_request.status == HostRequestStatus.rejected:
124 need_feedback = not session.execute(
125 select(
126 exists().where(
127 HostRequestFeedback.from_user_id == context.user_id,
128 HostRequestFeedback.host_request_id == host_request.conversation_id,
129 )
130 )
131 ).scalar_one()
133 return requests_pb2.HostRequest(
134 host_request_id=host_request.conversation_id,
135 surfer_user_id=host_request.surfer_user_id,
136 host_user_id=host_request.host_user_id,
137 status=hostrequeststatus2api[host_request.status],
138 created=Timestamp_from_datetime(initial_message.time),
139 from_date=date_to_api(host_request.from_date),
140 to_date=date_to_api(host_request.to_date),
141 last_seen_message_id=(
142 host_request.surfer_last_seen_message_id
143 if context.user_id == host_request.surfer_user_id
144 else host_request.host_last_seen_message_id
145 ),
146 latest_message=message_to_pb(latest_message),
147 hosting_city=host_request.hosting_city,
148 hosting_lat=lat,
149 hosting_lng=lng,
150 hosting_radius=host_request.hosting_radius,
151 need_host_request_feedback=need_feedback,
152 is_archived=(
153 host_request.is_host_archived
154 if context.user_id == host_request.host_user_id
155 else host_request.is_surfer_archived
156 ),
157 )
160def _possibly_observe_first_response_time(
161 session: Session, host_request: HostRequest, user_id: int, response_type: str
162) -> None:
163 # if this is the first response then there's nothing by this user yet
164 assert host_request.host_user_id == user_id
166 number_messages_by_host = session.execute(
167 select(func.count())
168 .where(Message.conversation_id == host_request.conversation_id)
169 .where(Message.author_id == user_id)
170 ).scalar_one_or_none()
172 if number_messages_by_host == 0:
173 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one()
174 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one()
175 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe(
176 (now() - host_request.conversation.created).total_seconds()
177 )
180def _is_host_request_long_enough(text: str) -> bool:
181 # Python's len(str) does not match Javascript's string.length.
182 # e.g. len("é") == 2 but "é".length == 1.
183 # To match the frontend's validation, measure the string in utf16 code units.
184 text_length_utf16 = len(text.encode("utf-16-le")) // 2 # utf-16-le does not include a prefix BOM code unit.
185 return text_length_utf16 >= HOST_REQUEST_MIN_LENGTH_UTF16
188class Requests(requests_pb2_grpc.RequestsServicer):
189 def CreateHostRequest(
190 self, request: requests_pb2.CreateHostRequestReq, context: CouchersContext, session: Session
191 ) -> requests_pb2.CreateHostRequestRes:
192 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
193 if not has_completed_profile(session, user):
194 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request")
196 if request.host_user_id == context.user_id:
197 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self")
199 # just to check host exists and is visible
200 host = session.execute(
201 select(User).where(users_visible(context, User)).where(User.id == request.host_user_id)
202 ).scalar_one_or_none()
203 if not host:
204 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
206 from_date = parse_date(request.from_date)
207 to_date = parse_date(request.to_date)
209 if not from_date or not to_date:
210 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date")
212 today = today_in_timezone(host.timezone)
214 # request starts from the past
215 if from_date < today:
216 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today")
218 # from_date is not >= to_date
219 if from_date >= to_date:
220 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to")
222 # No need to check today > to_date
224 if from_date - today > timedelta(days=365):
225 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year")
227 if to_date - from_date > timedelta(days=365):
228 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year")
230 # Check minimum length
231 if not _is_host_request_long_enough(request.text):
232 context.abort_with_error_code(
233 grpc.StatusCode.INVALID_ARGUMENT,
234 "host_request_too_short",
235 substitutions={"chars": str(HOST_REQUEST_MIN_LENGTH_UTF16)},
236 )
238 # Check if user has been sending host requests excessively
239 if process_rate_limits_and_check_abort(
240 session=session, user_id=context.user_id, action=RateLimitAction.host_request
241 ):
242 context.abort_with_error_code(
243 grpc.StatusCode.RESOURCE_EXHAUSTED,
244 "host_request_rate_limit",
245 substitutions={"hours": str(RATE_LIMIT_HOURS)},
246 )
248 conversation = Conversation()
249 session.add(conversation)
250 session.flush()
252 session.add(
253 Message(
254 conversation_id=conversation.id,
255 author_id=context.user_id,
256 message_type=MessageType.chat_created,
257 )
258 )
260 message = Message(
261 conversation_id=conversation.id,
262 author_id=context.user_id,
263 text=request.text,
264 message_type=MessageType.text,
265 )
266 session.add(message)
267 session.flush()
269 # Create moderation state for UMS (starts as SHADOWED)
270 moderation_state = create_moderation(
271 session=session,
272 object_type=ModerationObjectType.host_request,
273 object_id=conversation.id,
274 creator_user_id=context.user_id,
275 )
277 host_request = HostRequest(
278 conversation_id=conversation.id,
279 surfer_user_id=context.user_id,
280 host_user_id=host.id,
281 moderation_state_id=moderation_state.id,
282 from_date=from_date,
283 to_date=to_date,
284 status=HostRequestStatus.pending,
285 surfer_last_seen_message_id=message.id,
286 # TODO: tz
287 # timezone=host.timezone,
288 hosting_city=host.city,
289 hosting_location=host.geom,
290 hosting_radius=host.geom_radius,
291 )
292 session.add(host_request)
293 session.flush()
295 notify(
296 session,
297 user_id=host_request.host_user_id,
298 topic_action=NotificationTopicAction.host_request__create,
299 key=str(host_request.conversation_id),
300 data=notification_data_pb2.HostRequestCreate(
301 host_request=host_request_to_pb(host_request, session, context),
302 surfer=user_model_to_pb(host_request.surfer, session, context),
303 text=request.text,
304 ),
305 moderation_state_id=moderation_state.id,
306 )
308 host_requests_sent_counter.labels(user.gender, host.gender).inc()
309 sent_messages_counter.labels(user.gender, "host request send").inc()
310 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe(
311 (now() - user.joined).total_seconds()
312 )
313 log_event(
314 context,
315 session,
316 "host_request.created",
317 {
318 "host_request_id": host_request.conversation_id,
319 "host_id": host.id,
320 "surfer_gender": user.gender,
321 "host_gender": host.gender,
322 "city": host.city,
323 "from_date": str(from_date),
324 "to_date": str(to_date),
325 "nights": (to_date - from_date).days,
326 },
327 )
329 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id)
331 def GetHostRequest(
332 self, request: requests_pb2.GetHostRequestReq, context: CouchersContext, session: Session
333 ) -> requests_pb2.HostRequest:
334 host_request = session.execute(
335 where_moderated_content_visible(
336 where_users_column_visible(
337 where_users_column_visible(
338 select(HostRequest),
339 context,
340 HostRequest.surfer_user_id,
341 ),
342 context,
343 HostRequest.host_user_id,
344 ),
345 context,
346 HostRequest,
347 is_list_operation=False,
348 )
349 .where(HostRequest.conversation_id == request.host_request_id)
350 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id))
351 ).scalar_one_or_none()
353 if not host_request:
354 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
356 return host_request_to_pb(host_request, session, context)
358 def ListHostRequests(
359 self, request: requests_pb2.ListHostRequestsReq, context: CouchersContext, session: Session
360 ) -> requests_pb2.ListHostRequestsRes:
361 if request.only_sent and request.only_received: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true
362 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
364 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
365 pagination = min(pagination, MAX_PAGE_SIZE)
367 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have
368 # none as message_2.id. So just filter for these to get the highest messages only.
369 # See https://stackoverflow.com/a/27802817/6115336
370 message_2 = aliased(Message)
371 statement = where_moderated_content_visible(
372 where_users_column_visible(
373 where_users_column_visible(
374 select(Message, HostRequest, Conversation)
375 .outerjoin(
376 message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id)
377 )
378 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
379 .join(Conversation, Conversation.id == HostRequest.conversation_id),
380 context,
381 HostRequest.surfer_user_id,
382 ),
383 context,
384 HostRequest.host_user_id,
385 ),
386 context,
387 HostRequest,
388 is_list_operation=True,
389 ).where(message_2.id == None)
391 if request.last_request_id != 0:
392 statement = statement.where(Message.id < request.last_request_id)
393 if request.only_sent:
394 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
395 elif request.only_received:
396 statement = statement.where(HostRequest.host_user_id == context.user_id)
397 elif request.HasField("only_archived"):
398 statement = statement.where(
399 or_(
400 and_(
401 HostRequest.surfer_user_id == context.user_id,
402 HostRequest.is_surfer_archived == request.only_archived,
403 ),
404 and_(
405 HostRequest.host_user_id == context.user_id,
406 HostRequest.is_host_archived == request.only_archived,
407 ),
408 )
409 )
410 else:
411 statement = statement.where(
412 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
413 )
415 # TODO: I considered having the latest control message be the single source of truth for
416 # the HostRequest.status, but decided against it because of this filter.
417 # Another possibility is to filter in the python instead of SQL, but that's slower
418 if request.only_active:
419 statement = statement.where(
420 or_(
421 HostRequest.status == HostRequestStatus.pending,
422 HostRequest.status == HostRequestStatus.accepted,
423 HostRequest.status == HostRequestStatus.confirmed,
424 )
425 )
426 statement = statement.where(HostRequest.end_time <= func.now())
428 statement = statement.order_by(Message.id.desc()).limit(pagination + 1)
429 results = session.execute(statement).all()
431 host_requests = []
432 for result in results[:pagination]:
433 lat, lng = get_coordinates(result.HostRequest.hosting_location)
434 host_requests.append(
435 requests_pb2.HostRequest(
436 host_request_id=result.HostRequest.conversation_id,
437 surfer_user_id=result.HostRequest.surfer_user_id,
438 host_user_id=result.HostRequest.host_user_id,
439 status=hostrequeststatus2api[result.HostRequest.status],
440 created=Timestamp_from_datetime(result.Conversation.created),
441 from_date=date_to_api(result.HostRequest.from_date),
442 to_date=date_to_api(result.HostRequest.to_date),
443 last_seen_message_id=(
444 result.HostRequest.surfer_last_seen_message_id
445 if context.user_id == result.HostRequest.surfer_user_id
446 else result.HostRequest.host_last_seen_message_id
447 ),
448 latest_message=message_to_pb(result.Message),
449 hosting_city=result.HostRequest.hosting_city,
450 hosting_lat=lat,
451 hosting_lng=lng,
452 hosting_radius=result.HostRequest.hosting_radius,
453 )
454 )
456 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO
457 no_more = len(results) <= pagination
459 return requests_pb2.ListHostRequestsRes(
460 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests
461 )
463 def RespondHostRequest(
464 self, request: requests_pb2.RespondHostRequestReq, context: CouchersContext, session: Session
465 ) -> empty_pb2.Empty:
466 def count_host_response(other_user_id: int, response_type: str) -> None:
467 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
468 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one()
469 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc()
470 sent_messages_counter.labels(user_gender, "host request response").inc()
472 host_request = session.execute(
473 where_moderated_content_visible(
474 where_users_column_visible(
475 where_users_column_visible(
476 select(HostRequest),
477 context,
478 HostRequest.surfer_user_id,
479 ),
480 context,
481 HostRequest.host_user_id,
482 ),
483 context,
484 HostRequest,
485 is_list_operation=False,
486 ).where(HostRequest.conversation_id == request.host_request_id)
487 ).scalar_one_or_none()
489 if not host_request:
490 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
492 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
493 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
495 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING:
496 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
498 if host_request.end_time < now(): 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true
499 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past")
501 control_message = Message(
502 message_type=MessageType.host_request_status_changed,
503 conversation_id=host_request.conversation_id,
504 author_id=context.user_id,
505 )
507 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED:
508 # only host can accept
509 if context.user_id != host_request.host_user_id:
510 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host")
511 # can't accept a cancelled or confirmed request (only reject), or already accepted
512 if ( 512 ↛ 517line 512 didn't jump to line 517 because the condition on line 512 was never true
513 host_request.status == HostRequestStatus.cancelled
514 or host_request.status == HostRequestStatus.confirmed
515 or host_request.status == HostRequestStatus.accepted
516 ):
517 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
518 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted")
519 control_message.host_request_status_target = HostRequestStatus.accepted
520 host_request.status = HostRequestStatus.accepted
521 session.flush()
523 notify(
524 session,
525 user_id=host_request.surfer_user_id,
526 topic_action=NotificationTopicAction.host_request__accept,
527 key=str(host_request.conversation_id),
528 data=notification_data_pb2.HostRequestAccept(
529 host_request=host_request_to_pb(host_request, session, context),
530 host=user_model_to_pb(host_request.host, session, context),
531 ),
532 moderation_state_id=host_request.moderation_state_id,
533 )
535 count_host_response(host_request.surfer_user_id, "accepted")
536 log_event(
537 context,
538 session,
539 "host_request.accepted",
540 {
541 "host_request_id": host_request.conversation_id,
542 "surfer_id": host_request.surfer_user_id,
543 "host_id": host_request.host_user_id,
544 "surfer_gender": host_request.surfer.gender,
545 "host_gender": host_request.host.gender,
546 "from_date": str(host_request.from_date),
547 "to_date": str(host_request.to_date),
548 "host_city": host_request.hosting_city,
549 },
550 )
552 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED:
553 # only host can reject
554 if context.user_id != host_request.host_user_id: 554 ↛ 555line 554 didn't jump to line 555 because the condition on line 554 was never true
555 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
556 # can't reject a cancelled or already rejected request
557 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true
558 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
559 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected")
560 control_message.host_request_status_target = HostRequestStatus.rejected
561 host_request.status = HostRequestStatus.rejected
562 session.flush()
564 notify(
565 session,
566 user_id=host_request.surfer_user_id,
567 topic_action=NotificationTopicAction.host_request__reject,
568 key=str(host_request.conversation_id),
569 data=notification_data_pb2.HostRequestReject(
570 host_request=host_request_to_pb(host_request, session, context),
571 host=user_model_to_pb(host_request.host, session, context),
572 ),
573 moderation_state_id=host_request.moderation_state_id,
574 )
576 count_host_response(host_request.surfer_user_id, "rejected")
577 log_event(
578 context,
579 session,
580 "host_request.rejected",
581 {
582 "host_request_id": host_request.conversation_id,
583 "surfer_id": host_request.surfer_user_id,
584 "host_id": host_request.host_user_id,
585 "surfer_gender": host_request.surfer.gender,
586 "host_gender": host_request.host.gender,
587 "from_date": str(host_request.from_date),
588 "to_date": str(host_request.to_date),
589 "host_city": host_request.hosting_city,
590 },
591 )
593 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED:
594 # only surfer can confirm
595 if context.user_id != host_request.surfer_user_id:
596 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
597 # can only confirm an accepted request
598 if host_request.status != HostRequestStatus.accepted:
599 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
600 control_message.host_request_status_target = HostRequestStatus.confirmed
601 host_request.status = HostRequestStatus.confirmed
602 session.flush()
604 notify(
605 session,
606 user_id=host_request.host_user_id,
607 topic_action=NotificationTopicAction.host_request__confirm,
608 key=str(host_request.conversation_id),
609 data=notification_data_pb2.HostRequestConfirm(
610 host_request=host_request_to_pb(host_request, session, context),
611 surfer=user_model_to_pb(host_request.surfer, session, context),
612 ),
613 moderation_state_id=host_request.moderation_state_id,
614 )
616 count_host_response(host_request.host_user_id, "confirmed")
617 log_event(
618 context,
619 session,
620 "host_request.confirmed",
621 {
622 "host_request_id": host_request.conversation_id,
623 "surfer_id": host_request.surfer_user_id,
624 "host_id": host_request.host_user_id,
625 "surfer_gender": host_request.surfer.gender,
626 "host_gender": host_request.host.gender,
627 "from_date": str(host_request.from_date),
628 "to_date": str(host_request.to_date),
629 "host_city": host_request.hosting_city,
630 },
631 )
633 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED:
634 # only surfer can cancel
635 if context.user_id != host_request.surfer_user_id:
636 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
637 # can't' cancel an already cancelled or rejected request
638 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 638 ↛ 639line 638 didn't jump to line 639 because the condition on line 638 was never true
639 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
640 control_message.host_request_status_target = HostRequestStatus.cancelled
641 host_request.status = HostRequestStatus.cancelled
642 session.flush()
644 notify(
645 session,
646 user_id=host_request.host_user_id,
647 topic_action=NotificationTopicAction.host_request__cancel,
648 key=str(host_request.conversation_id),
649 data=notification_data_pb2.HostRequestCancel(
650 host_request=host_request_to_pb(host_request, session, context),
651 surfer=user_model_to_pb(host_request.surfer, session, context),
652 ),
653 moderation_state_id=host_request.moderation_state_id,
654 )
656 count_host_response(host_request.host_user_id, "cancelled")
657 log_event(
658 context,
659 session,
660 "host_request.cancelled",
661 {
662 "host_request_id": host_request.conversation_id,
663 "surfer_id": host_request.surfer_user_id,
664 "host_id": host_request.host_user_id,
665 "surfer_gender": host_request.surfer.gender,
666 "host_gender": host_request.host.gender,
667 "from_date": str(host_request.from_date),
668 "to_date": str(host_request.to_date),
669 "host_city": host_request.hosting_city,
670 },
671 )
673 session.add(control_message)
675 if request.text:
676 latest_message = Message(
677 conversation_id=host_request.conversation_id,
678 text=request.text,
679 author_id=context.user_id,
680 message_type=MessageType.text,
681 )
683 session.add(latest_message)
684 else:
685 latest_message = control_message
687 session.flush()
689 if host_request.surfer_user_id == context.user_id:
690 host_request.surfer_last_seen_message_id = latest_message.id
691 else:
692 host_request.host_last_seen_message_id = latest_message.id
693 session.commit()
695 return empty_pb2.Empty()
697 def GetHostRequestMessages(
698 self, request: requests_pb2.GetHostRequestMessagesReq, context: CouchersContext, session: Session
699 ) -> requests_pb2.GetHostRequestMessagesRes:
700 host_request = session.execute(
701 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
702 HostRequest.conversation_id == request.host_request_id
703 )
704 ).scalar_one_or_none()
706 if not host_request: 706 ↛ 707line 706 didn't jump to line 707 because the condition on line 706 was never true
707 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
709 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 709 ↛ 710line 709 didn't jump to line 710 because the condition on line 709 was never true
710 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
712 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
713 pagination = min(pagination, MAX_PAGE_SIZE)
715 messages = (
716 session.execute(
717 select(Message)
718 .where(Message.conversation_id == host_request.conversation_id)
719 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
720 .order_by(Message.id.desc())
721 .limit(pagination + 1)
722 )
723 .scalars()
724 .all()
725 )
727 no_more = len(messages) <= pagination
729 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0
731 return requests_pb2.GetHostRequestMessagesRes(
732 last_message_id=last_message_id,
733 no_more=no_more,
734 messages=[message_to_pb(message) for message in messages[:pagination]],
735 )
737 def SendHostRequestMessage(
738 self, request: requests_pb2.SendHostRequestMessageReq, context: CouchersContext, session: Session
739 ) -> empty_pb2.Empty:
740 if request.text == "":
741 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
742 host_request = session.execute(
743 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
744 HostRequest.conversation_id == request.host_request_id
745 )
746 ).scalar_one_or_none()
748 if not host_request:
749 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
751 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
752 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
754 if host_request.host_user_id == context.user_id:
755 _possibly_observe_first_response_time(session, host_request, context.user_id, "message")
757 message = Message(
758 conversation_id=host_request.conversation_id,
759 author_id=context.user_id,
760 message_type=MessageType.text,
761 text=request.text,
762 )
764 session.add(message)
765 session.flush()
767 if host_request.surfer_user_id == context.user_id:
768 host_request.surfer_last_seen_message_id = message.id
770 notify(
771 session,
772 user_id=host_request.host_user_id,
773 topic_action=NotificationTopicAction.host_request__message,
774 key=str(host_request.conversation_id),
775 data=notification_data_pb2.HostRequestMessage(
776 host_request=host_request_to_pb(host_request, session, context),
777 user=user_model_to_pb(host_request.surfer, session, context),
778 text=request.text,
779 am_host=True,
780 ),
781 moderation_state_id=host_request.moderation_state_id,
782 )
784 else:
785 host_request.host_last_seen_message_id = message.id
787 notify(
788 session,
789 user_id=host_request.surfer_user_id,
790 topic_action=NotificationTopicAction.host_request__message,
791 key=str(host_request.conversation_id),
792 data=notification_data_pb2.HostRequestMessage(
793 host_request=host_request_to_pb(host_request, session, context),
794 user=user_model_to_pb(host_request.host, session, context),
795 text=request.text,
796 am_host=False,
797 ),
798 moderation_state_id=host_request.moderation_state_id,
799 )
801 session.commit()
803 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
804 sent_messages_counter.labels(user_gender, "host request").inc()
805 log_event(
806 context,
807 session,
808 "host_request.message_sent",
809 {
810 "host_request_id": host_request.conversation_id,
811 "surfer_id": host_request.surfer_user_id,
812 "host_id": host_request.host_user_id,
813 "role": "host" if context.user_id == host_request.host_user_id else "surfer",
814 "host_city": host_request.hosting_city,
815 },
816 )
818 return empty_pb2.Empty()
820 def GetHostRequestUpdates(
821 self, request: requests_pb2.GetHostRequestUpdatesReq, context: CouchersContext, session: Session
822 ) -> requests_pb2.GetHostRequestUpdatesRes:
823 if request.only_sent and request.only_received: 823 ↛ 824line 823 didn't jump to line 824 because the condition on line 823 was never true
824 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
826 if request.newest_message_id == 0:
827 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
829 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 829 ↛ 830line 829 didn't jump to line 830 because the condition on line 829 was never true
830 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
832 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
833 pagination = min(pagination, MAX_PAGE_SIZE)
835 statement = where_moderated_content_visible(
836 select(
837 Message,
838 HostRequest.status.label("host_request_status"),
839 HostRequest.conversation_id.label("host_request_id"),
840 )
841 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
842 .where(Message.id > request.newest_message_id),
843 context,
844 HostRequest,
845 is_list_operation=False,
846 )
848 if request.only_sent: 848 ↛ 849line 848 didn't jump to line 849 because the condition on line 848 was never true
849 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
850 elif request.only_received: 850 ↛ 851line 850 didn't jump to line 851 because the condition on line 850 was never true
851 statement = statement.where(HostRequest.host_user_id == context.user_id)
852 else:
853 statement = statement.where(
854 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
855 )
857 statement = statement.order_by(Message.id.asc()).limit(pagination + 1)
858 res = session.execute(statement).all()
860 no_more = len(res) <= pagination
862 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO
864 return requests_pb2.GetHostRequestUpdatesRes(
865 no_more=no_more,
866 updates=[
867 requests_pb2.HostRequestUpdate(
868 host_request_id=result.host_request_id,
869 status=hostrequeststatus2api[result.host_request_status],
870 message=message_to_pb(result.Message),
871 )
872 for result in res[:pagination]
873 ],
874 )
876 def MarkLastSeenHostRequest(
877 self, request: requests_pb2.MarkLastSeenHostRequestReq, context: CouchersContext, session: Session
878 ) -> empty_pb2.Empty:
879 host_request = session.execute(
880 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
881 HostRequest.conversation_id == request.host_request_id
882 )
883 ).scalar_one_or_none()
885 if not host_request: 885 ↛ 886line 885 didn't jump to line 886 because the condition on line 885 was never true
886 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
888 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 888 ↛ 889line 888 didn't jump to line 889 because the condition on line 888 was never true
889 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
891 if host_request.surfer_user_id == context.user_id: 891 ↛ 892line 891 didn't jump to line 892 because the condition on line 891 was never true
892 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id:
893 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
894 host_request.surfer_last_seen_message_id = request.last_seen_message_id
895 else:
896 if not host_request.host_last_seen_message_id <= request.last_seen_message_id:
897 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
898 host_request.host_last_seen_message_id = request.last_seen_message_id
900 session.commit()
901 return empty_pb2.Empty()
903 def SetHostRequestArchiveStatus(
904 self, request: requests_pb2.SetHostRequestArchiveStatusReq, context: CouchersContext, session: Session
905 ) -> requests_pb2.SetHostRequestArchiveStatusRes:
906 host_request = session.execute(
907 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False)
908 .where(HostRequest.conversation_id == request.host_request_id)
909 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id))
910 ).scalar_one_or_none()
912 if not host_request: 912 ↛ 913line 912 didn't jump to line 913 because the condition on line 912 was never true
913 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
915 if context.user_id == host_request.surfer_user_id: 915 ↛ 918line 915 didn't jump to line 918 because the condition on line 915 was always true
916 host_request.is_surfer_archived = request.is_archived
917 else:
918 host_request.is_host_archived = request.is_archived
920 return requests_pb2.SetHostRequestArchiveStatusRes(
921 host_request_id=host_request.conversation_id,
922 is_archived=request.is_archived,
923 )
925 def GetResponseRate(
926 self, request: requests_pb2.GetResponseRateReq, context: CouchersContext, session: Session
927 ) -> requests_pb2.GetResponseRateRes:
928 user_res = session.execute(
929 select(User.id, UserResponseRate)
930 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id)
931 .where(users_visible(context, User))
932 .where(User.id == request.user_id)
933 ).one_or_none()
935 # if user doesn't exist, return None
936 if not user_res:
937 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
939 user, response_rates = user_res
940 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates)) # type: ignore[arg-type]
942 def SendHostRequestFeedback(
943 self, request: requests_pb2.SendHostRequestFeedbackReq, context: CouchersContext, session: Session
944 ) -> empty_pb2.Empty:
945 host_request = session.execute(
946 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False)
947 .where(HostRequest.conversation_id == request.host_request_id)
948 .where(HostRequest.host_user_id == context.user_id)
949 ).scalar_one_or_none()
951 if not host_request:
952 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
954 feedback = session.execute(
955 select(HostRequestFeedback)
956 .where(HostRequestFeedback.host_request_id == host_request.conversation_id)
957 .where(HostRequestFeedback.from_user_id == context.user_id)
958 ).scalar_one_or_none()
960 if feedback:
961 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback")
963 session.add(
964 HostRequestFeedback(
965 host_request_id=host_request.conversation_id,
966 from_user_id=host_request.host_user_id,
967 to_user_id=host_request.surfer_user_id,
968 request_quality=hostrequestquality2sql.get(request.host_request_quality),
969 decline_reason=request.decline_reason,
970 )
971 )
972 quality = hostrequestquality2sql.get(request.host_request_quality)
973 log_event(
974 context,
975 session,
976 "host_request.feedback_submitted",
977 {
978 "host_request_id": host_request.conversation_id,
979 "surfer_id": host_request.surfer_user_id,
980 "host_id": host_request.host_user_id,
981 "request_quality": quality.name if quality else None,
982 "has_decline_reason": bool(request.decline_reason),
983 "host_city": host_request.hosting_city,
984 },
985 )
987 return empty_pb2.Empty()