Coverage for src / couchers / servicers / requests.py: 91%
295 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-07 11:26 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-07 11:26 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy import exists, select
7from sqlalchemy.orm import Session, aliased
8from sqlalchemy.sql import and_, func, or_
10from couchers.constants import HOST_REQUEST_MIN_LENGTH_UTF16
11from couchers.context import CouchersContext
12from couchers.materialized_views import UserResponseRate
13from couchers.metrics import (
14 account_age_on_host_request_create_histogram,
15 host_request_first_response_histogram,
16 host_request_responses_counter,
17 host_requests_sent_counter,
18 sent_messages_counter,
19)
20from couchers.models import (
21 Conversation,
22 HostRequest,
23 HostRequestFeedback,
24 HostRequestQuality,
25 HostRequestStatus,
26 Message,
27 MessageType,
28 ModerationObjectType,
29 RateLimitAction,
30 User,
31)
32from couchers.moderation.utils import create_moderation
33from couchers.notifications.notify import notify
34from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc
35from couchers.rate_limits.check import process_rate_limits_and_check_abort
36from couchers.rate_limits.definitions import RATE_LIMIT_HOURS
37from couchers.servicers.api import response_rate_to_pb, user_model_to_pb
38from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible
39from couchers.utils import (
40 Timestamp_from_datetime,
41 date_to_api,
42 get_coordinates,
43 now,
44 parse_date,
45 today_in_timezone,
46)
48logger = logging.getLogger(__name__)
50DEFAULT_PAGINATION_LENGTH = 10
51MAX_PAGE_SIZE = 50
54hostrequeststatus2api = {
55 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING,
56 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED,
57 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED,
58 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED,
59 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED,
60}
62hostrequestquality2sql = {
63 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality,
64 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality,
65 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality,
66}
69def message_to_pb(message: Message) -> conversations_pb2.Message:
70 """
71 Turns the given message to a protocol buffer
72 """
73 if message.is_normal_message:
74 return conversations_pb2.Message(
75 message_id=message.id,
76 author_user_id=message.author_id,
77 time=Timestamp_from_datetime(message.time),
78 text=conversations_pb2.MessageContentText(text=message.text),
79 )
80 else:
81 return conversations_pb2.Message(
82 message_id=message.id,
83 author_user_id=message.author_id,
84 time=Timestamp_from_datetime(message.time),
85 chat_created=(
86 conversations_pb2.MessageContentChatCreated()
87 if message.message_type == MessageType.chat_created
88 else None
89 ),
90 host_request_status_changed=(
91 conversations_pb2.MessageContentHostRequestStatusChanged(
92 status=hostrequeststatus2api[message.host_request_status_target] # type: ignore[index]
93 )
94 if message.message_type == MessageType.host_request_status_changed
95 else None
96 ),
97 )
100def host_request_to_pb(
101 host_request: HostRequest, session: Session, context: CouchersContext
102) -> requests_pb2.HostRequest:
103 initial_message = session.execute(
104 select(Message)
105 .where(Message.conversation_id == host_request.conversation_id)
106 .order_by(Message.id.asc())
107 .limit(1)
108 ).scalar_one()
110 latest_message = session.execute(
111 select(Message)
112 .where(Message.conversation_id == host_request.conversation_id)
113 .order_by(Message.id.desc())
114 .limit(1)
115 ).scalar_one()
117 lat, lng = get_coordinates(host_request.hosting_location)
119 need_feedback = False
120 if context.user_id == host_request.host_user_id and host_request.status == HostRequestStatus.rejected:
121 need_feedback = not session.execute(
122 select(
123 exists().where(
124 HostRequestFeedback.from_user_id == context.user_id,
125 HostRequestFeedback.host_request_id == host_request.conversation_id,
126 )
127 )
128 ).scalar_one()
130 return requests_pb2.HostRequest(
131 host_request_id=host_request.conversation_id,
132 surfer_user_id=host_request.surfer_user_id,
133 host_user_id=host_request.host_user_id,
134 status=hostrequeststatus2api[host_request.status],
135 created=Timestamp_from_datetime(initial_message.time),
136 from_date=date_to_api(host_request.from_date),
137 to_date=date_to_api(host_request.to_date),
138 last_seen_message_id=(
139 host_request.surfer_last_seen_message_id
140 if context.user_id == host_request.surfer_user_id
141 else host_request.host_last_seen_message_id
142 ),
143 latest_message=message_to_pb(latest_message),
144 hosting_city=host_request.hosting_city,
145 hosting_lat=lat,
146 hosting_lng=lng,
147 hosting_radius=host_request.hosting_radius,
148 need_host_request_feedback=need_feedback,
149 )
152def _possibly_observe_first_response_time(
153 session: Session, host_request: HostRequest, user_id: int, response_type: str
154) -> None:
155 # if this is the first response then there's nothing by this user yet
156 assert host_request.host_user_id == user_id
158 number_messages_by_host = session.execute(
159 select(func.count())
160 .where(Message.conversation_id == host_request.conversation_id)
161 .where(Message.author_id == user_id)
162 ).scalar_one_or_none()
164 if number_messages_by_host == 0:
165 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one()
166 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one()
167 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe(
168 (now() - host_request.conversation.created).total_seconds()
169 )
172def _is_host_request_long_enough(text: str) -> bool:
173 # Python's len(str) does not match Javascript's string.length.
174 # e.g. len("é") == 2 but "é".length == 1.
175 # To match the frontend's validation, measure the string in utf16 code units.
176 text_length_utf16 = len(text.encode("utf-16-le")) // 2 # utf-16-le does not include a prefix BOM code unit.
177 return text_length_utf16 >= HOST_REQUEST_MIN_LENGTH_UTF16
180class Requests(requests_pb2_grpc.RequestsServicer):
181 def CreateHostRequest(
182 self, request: requests_pb2.CreateHostRequestReq, context: CouchersContext, session: Session
183 ) -> requests_pb2.CreateHostRequestRes:
184 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
185 if not user.has_completed_profile:
186 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request")
188 if request.host_user_id == context.user_id:
189 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self")
191 # just to check host exists and is visible
192 host = session.execute(
193 select(User).where(users_visible(context, User)).where(User.id == request.host_user_id)
194 ).scalar_one_or_none()
195 if not host:
196 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
198 from_date = parse_date(request.from_date)
199 to_date = parse_date(request.to_date)
201 if not from_date or not to_date:
202 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date")
204 today = today_in_timezone(host.timezone)
206 # request starts from the past
207 if from_date < today:
208 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today")
210 # from_date is not >= to_date
211 if from_date >= to_date:
212 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to")
214 # No need to check today > to_date
216 if from_date - today > timedelta(days=365):
217 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year")
219 if to_date - from_date > timedelta(days=365):
220 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year")
222 # Check minimum length
223 if not _is_host_request_long_enough(request.text):
224 context.abort_with_error_code(
225 grpc.StatusCode.INVALID_ARGUMENT,
226 "host_request_too_short",
227 substitutions={"chars": str(HOST_REQUEST_MIN_LENGTH_UTF16)},
228 )
230 # Check if user has been sending host requests excessively
231 if process_rate_limits_and_check_abort(
232 session=session, user_id=context.user_id, action=RateLimitAction.host_request
233 ):
234 context.abort_with_error_code(
235 grpc.StatusCode.RESOURCE_EXHAUSTED,
236 "host_request_rate_limit",
237 substitutions={"hours": str(RATE_LIMIT_HOURS)},
238 )
240 conversation = Conversation()
241 session.add(conversation)
242 session.flush()
244 session.add(
245 Message(
246 conversation_id=conversation.id,
247 author_id=context.user_id,
248 message_type=MessageType.chat_created,
249 )
250 )
252 message = Message(
253 conversation_id=conversation.id,
254 author_id=context.user_id,
255 text=request.text,
256 message_type=MessageType.text,
257 )
258 session.add(message)
259 session.flush()
261 # Create moderation state for UMS (starts as SHADOWED)
262 moderation_state = create_moderation(
263 session=session,
264 object_type=ModerationObjectType.HOST_REQUEST,
265 object_id=conversation.id,
266 creator_user_id=context.user_id,
267 )
269 host_request = HostRequest(
270 conversation_id=conversation.id,
271 surfer_user_id=context.user_id,
272 host_user_id=host.id,
273 moderation_state_id=moderation_state.id,
274 from_date=from_date,
275 to_date=to_date,
276 status=HostRequestStatus.pending,
277 surfer_last_seen_message_id=message.id,
278 # TODO: tz
279 # timezone=host.timezone,
280 hosting_city=host.city,
281 hosting_location=host.geom,
282 hosting_radius=host.geom_radius,
283 )
284 session.add(host_request)
285 session.flush()
287 notify(
288 session,
289 user_id=host_request.host_user_id,
290 topic_action="host_request:create",
291 key=str(host_request.conversation_id),
292 data=notification_data_pb2.HostRequestCreate(
293 host_request=host_request_to_pb(host_request, session, context),
294 surfer=user_model_to_pb(host_request.surfer, session, context),
295 text=request.text,
296 ),
297 moderation_state_id=moderation_state.id,
298 )
300 host_requests_sent_counter.labels(user.gender, host.gender).inc()
301 sent_messages_counter.labels(user.gender, "host request send").inc()
302 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe(
303 (now() - user.joined).total_seconds()
304 )
306 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id)
308 def GetHostRequest(
309 self, request: requests_pb2.GetHostRequestReq, context: CouchersContext, session: Session
310 ) -> requests_pb2.HostRequest:
311 host_request = session.execute(
312 where_moderated_content_visible(
313 where_users_column_visible(
314 where_users_column_visible(
315 select(HostRequest),
316 context,
317 HostRequest.surfer_user_id,
318 ),
319 context,
320 HostRequest.host_user_id,
321 ),
322 context,
323 HostRequest,
324 is_list_operation=False,
325 )
326 .where(HostRequest.conversation_id == request.host_request_id)
327 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id))
328 ).scalar_one_or_none()
330 if not host_request:
331 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
333 return host_request_to_pb(host_request, session, context)
335 def ListHostRequests(
336 self, request: requests_pb2.ListHostRequestsReq, context: CouchersContext, session: Session
337 ) -> requests_pb2.ListHostRequestsRes:
338 if request.only_sent and request.only_received: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true
339 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
341 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
342 pagination = min(pagination, MAX_PAGE_SIZE)
344 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have
345 # none as message_2.id. So just filter for these to get the highest messages only.
346 # See https://stackoverflow.com/a/27802817/6115336
347 message_2 = aliased(Message)
348 statement = where_moderated_content_visible(
349 where_users_column_visible(
350 where_users_column_visible(
351 select(Message, HostRequest, Conversation)
352 .outerjoin(
353 message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id)
354 )
355 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
356 .join(Conversation, Conversation.id == HostRequest.conversation_id),
357 context,
358 HostRequest.surfer_user_id,
359 ),
360 context,
361 HostRequest.host_user_id,
362 ),
363 context,
364 HostRequest,
365 is_list_operation=True,
366 ).where(message_2.id == None)
368 if request.last_request_id != 0:
369 statement = statement.where(Message.id < request.last_request_id)
370 if request.only_sent:
371 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
372 elif request.only_received:
373 statement = statement.where(HostRequest.host_user_id == context.user_id)
374 elif request.HasField("only_archived"):
375 statement = statement.where(
376 or_(
377 and_(
378 HostRequest.surfer_user_id == context.user_id,
379 HostRequest.is_surfer_archived == request.only_archived,
380 ),
381 and_(
382 HostRequest.host_user_id == context.user_id,
383 HostRequest.is_host_archived == request.only_archived,
384 ),
385 )
386 )
387 else:
388 statement = statement.where(
389 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
390 )
392 # TODO: I considered having the latest control message be the single source of truth for
393 # the HostRequest.status, but decided against it because of this filter.
394 # Another possibility is to filter in the python instead of SQL, but that's slower
395 if request.only_active:
396 statement = statement.where(
397 or_(
398 HostRequest.status == HostRequestStatus.pending,
399 HostRequest.status == HostRequestStatus.accepted,
400 HostRequest.status == HostRequestStatus.confirmed,
401 )
402 )
403 statement = statement.where(HostRequest.end_time <= func.now())
405 statement = statement.order_by(Message.id.desc()).limit(pagination + 1)
406 results = session.execute(statement).all()
408 host_requests = []
409 for result in results[:pagination]:
410 lat, lng = get_coordinates(result.HostRequest.hosting_location)
411 host_requests.append(
412 requests_pb2.HostRequest(
413 host_request_id=result.HostRequest.conversation_id,
414 surfer_user_id=result.HostRequest.surfer_user_id,
415 host_user_id=result.HostRequest.host_user_id,
416 status=hostrequeststatus2api[result.HostRequest.status],
417 created=Timestamp_from_datetime(result.Conversation.created),
418 from_date=date_to_api(result.HostRequest.from_date),
419 to_date=date_to_api(result.HostRequest.to_date),
420 last_seen_message_id=(
421 result.HostRequest.surfer_last_seen_message_id
422 if context.user_id == result.HostRequest.surfer_user_id
423 else result.HostRequest.host_last_seen_message_id
424 ),
425 latest_message=message_to_pb(result.Message),
426 hosting_city=result.HostRequest.hosting_city,
427 hosting_lat=lat,
428 hosting_lng=lng,
429 hosting_radius=result.HostRequest.hosting_radius,
430 )
431 )
433 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO
434 no_more = len(results) <= pagination
436 return requests_pb2.ListHostRequestsRes(
437 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests
438 )
440 def RespondHostRequest(
441 self, request: requests_pb2.RespondHostRequestReq, context: CouchersContext, session: Session
442 ) -> empty_pb2.Empty:
443 def count_host_response(other_user_id: int, response_type: str) -> None:
444 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
445 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one()
446 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc()
447 sent_messages_counter.labels(user_gender, "host request response").inc()
449 host_request = session.execute(
450 where_moderated_content_visible(
451 where_users_column_visible(
452 where_users_column_visible(
453 select(HostRequest),
454 context,
455 HostRequest.surfer_user_id,
456 ),
457 context,
458 HostRequest.host_user_id,
459 ),
460 context,
461 HostRequest,
462 is_list_operation=False,
463 ).where(HostRequest.conversation_id == request.host_request_id)
464 ).scalar_one_or_none()
466 if not host_request:
467 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
469 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
470 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
472 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING:
473 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
475 if host_request.end_time < now(): 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true
476 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past")
478 control_message = Message()
480 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED:
481 # only host can accept
482 if context.user_id != host_request.host_user_id:
483 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host")
484 # can't accept a cancelled or confirmed request (only reject), or already accepted
485 if ( 485 ↛ 490line 485 didn't jump to line 490 because the condition on line 485 was never true
486 host_request.status == HostRequestStatus.cancelled
487 or host_request.status == HostRequestStatus.confirmed
488 or host_request.status == HostRequestStatus.accepted
489 ):
490 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
491 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted")
492 control_message.host_request_status_target = HostRequestStatus.accepted
493 host_request.status = HostRequestStatus.accepted
494 session.flush()
496 notify(
497 session,
498 user_id=host_request.surfer_user_id,
499 topic_action="host_request:accept",
500 key=str(host_request.conversation_id),
501 data=notification_data_pb2.HostRequestAccept(
502 host_request=host_request_to_pb(host_request, session, context),
503 host=user_model_to_pb(host_request.host, session, context),
504 ),
505 moderation_state_id=host_request.moderation_state_id,
506 )
508 count_host_response(host_request.surfer_user_id, "accepted")
510 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED:
511 # only host can reject
512 if context.user_id != host_request.host_user_id: 512 ↛ 513line 512 didn't jump to line 513 because the condition on line 512 was never true
513 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
514 # can't reject a cancelled or already rejected request
515 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true
516 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
517 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected")
518 control_message.host_request_status_target = HostRequestStatus.rejected
519 host_request.status = HostRequestStatus.rejected
520 session.flush()
522 notify(
523 session,
524 user_id=host_request.surfer_user_id,
525 topic_action="host_request:reject",
526 key=str(host_request.conversation_id),
527 data=notification_data_pb2.HostRequestReject(
528 host_request=host_request_to_pb(host_request, session, context),
529 host=user_model_to_pb(host_request.host, session, context),
530 ),
531 moderation_state_id=host_request.moderation_state_id,
532 )
534 count_host_response(host_request.surfer_user_id, "rejected")
536 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED:
537 # only surfer can confirm
538 if context.user_id != host_request.surfer_user_id:
539 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
540 # can only confirm an accepted request
541 if host_request.status != HostRequestStatus.accepted:
542 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
543 control_message.host_request_status_target = HostRequestStatus.confirmed
544 host_request.status = HostRequestStatus.confirmed
545 session.flush()
547 notify(
548 session,
549 user_id=host_request.host_user_id,
550 topic_action="host_request:confirm",
551 key=str(host_request.conversation_id),
552 data=notification_data_pb2.HostRequestConfirm(
553 host_request=host_request_to_pb(host_request, session, context),
554 surfer=user_model_to_pb(host_request.surfer, session, context),
555 ),
556 moderation_state_id=host_request.moderation_state_id,
557 )
559 count_host_response(host_request.host_user_id, "confirmed")
561 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED:
562 # only surfer can cancel
563 if context.user_id != host_request.surfer_user_id:
564 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
565 # can't' cancel an already cancelled or rejected request
566 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 566 ↛ 567line 566 didn't jump to line 567 because the condition on line 566 was never true
567 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
568 control_message.host_request_status_target = HostRequestStatus.cancelled
569 host_request.status = HostRequestStatus.cancelled
570 session.flush()
572 notify(
573 session,
574 user_id=host_request.host_user_id,
575 topic_action="host_request:cancel",
576 key=str(host_request.conversation_id),
577 data=notification_data_pb2.HostRequestCancel(
578 host_request=host_request_to_pb(host_request, session, context),
579 surfer=user_model_to_pb(host_request.surfer, session, context),
580 ),
581 moderation_state_id=host_request.moderation_state_id,
582 )
584 count_host_response(host_request.host_user_id, "cancelled")
586 control_message.message_type = MessageType.host_request_status_changed
587 control_message.conversation_id = host_request.conversation_id
588 control_message.author_id = context.user_id
589 session.add(control_message)
591 if request.text:
592 latest_message = Message()
593 latest_message.conversation_id = host_request.conversation_id
594 latest_message.text = request.text
595 latest_message.author_id = context.user_id
596 latest_message.message_type = MessageType.text
597 session.add(latest_message)
598 else:
599 latest_message = control_message
601 session.flush()
603 if host_request.surfer_user_id == context.user_id:
604 host_request.surfer_last_seen_message_id = latest_message.id
605 else:
606 host_request.host_last_seen_message_id = latest_message.id
607 session.commit()
609 return empty_pb2.Empty()
611 def GetHostRequestMessages(
612 self, request: requests_pb2.GetHostRequestMessagesReq, context: CouchersContext, session: Session
613 ) -> requests_pb2.GetHostRequestMessagesRes:
614 host_request = session.execute(
615 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
616 HostRequest.conversation_id == request.host_request_id
617 )
618 ).scalar_one_or_none()
620 if not host_request: 620 ↛ 621line 620 didn't jump to line 621 because the condition on line 620 was never true
621 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
623 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 623 ↛ 624line 623 didn't jump to line 624 because the condition on line 623 was never true
624 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
626 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
627 pagination = min(pagination, MAX_PAGE_SIZE)
629 messages = (
630 session.execute(
631 select(Message)
632 .where(Message.conversation_id == host_request.conversation_id)
633 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
634 .order_by(Message.id.desc())
635 .limit(pagination + 1)
636 )
637 .scalars()
638 .all()
639 )
641 no_more = len(messages) <= pagination
643 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0
645 return requests_pb2.GetHostRequestMessagesRes(
646 last_message_id=last_message_id,
647 no_more=no_more,
648 messages=[message_to_pb(message) for message in messages[:pagination]],
649 )
651 def SendHostRequestMessage(
652 self, request: requests_pb2.SendHostRequestMessageReq, context: CouchersContext, session: Session
653 ) -> empty_pb2.Empty:
654 if request.text == "":
655 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
656 host_request = session.execute(
657 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
658 HostRequest.conversation_id == request.host_request_id
659 )
660 ).scalar_one_or_none()
662 if not host_request:
663 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
665 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
666 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
668 if host_request.host_user_id == context.user_id:
669 _possibly_observe_first_response_time(session, host_request, context.user_id, "message")
671 message = Message()
672 message.conversation_id = host_request.conversation_id
673 message.author_id = context.user_id
674 message.message_type = MessageType.text
675 message.text = request.text
676 session.add(message)
677 session.flush()
679 if host_request.surfer_user_id == context.user_id:
680 host_request.surfer_last_seen_message_id = message.id
682 notify(
683 session,
684 user_id=host_request.host_user_id,
685 topic_action="host_request:message",
686 key=str(host_request.conversation_id),
687 data=notification_data_pb2.HostRequestMessage(
688 host_request=host_request_to_pb(host_request, session, context),
689 user=user_model_to_pb(host_request.surfer, session, context),
690 text=request.text,
691 am_host=True,
692 ),
693 moderation_state_id=host_request.moderation_state_id,
694 )
696 else:
697 host_request.host_last_seen_message_id = message.id
699 notify(
700 session,
701 user_id=host_request.surfer_user_id,
702 topic_action="host_request:message",
703 key=str(host_request.conversation_id),
704 data=notification_data_pb2.HostRequestMessage(
705 host_request=host_request_to_pb(host_request, session, context),
706 user=user_model_to_pb(host_request.host, session, context),
707 text=request.text,
708 am_host=False,
709 ),
710 moderation_state_id=host_request.moderation_state_id,
711 )
713 session.commit()
715 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
716 sent_messages_counter.labels(user_gender, "host request").inc()
718 return empty_pb2.Empty()
720 def GetHostRequestUpdates(
721 self, request: requests_pb2.GetHostRequestUpdatesReq, context: CouchersContext, session: Session
722 ) -> requests_pb2.GetHostRequestUpdatesRes:
723 if request.only_sent and request.only_received: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true
724 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
726 if request.newest_message_id == 0:
727 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
729 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 729 ↛ 730line 729 didn't jump to line 730 because the condition on line 729 was never true
730 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
732 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
733 pagination = min(pagination, MAX_PAGE_SIZE)
735 statement = where_moderated_content_visible(
736 select(
737 Message,
738 HostRequest.status.label("host_request_status"),
739 HostRequest.conversation_id.label("host_request_id"),
740 )
741 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
742 .where(Message.id > request.newest_message_id),
743 context,
744 HostRequest,
745 is_list_operation=False,
746 )
748 if request.only_sent: 748 ↛ 749line 748 didn't jump to line 749 because the condition on line 748 was never true
749 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
750 elif request.only_received: 750 ↛ 751line 750 didn't jump to line 751 because the condition on line 750 was never true
751 statement = statement.where(HostRequest.host_user_id == context.user_id)
752 else:
753 statement = statement.where(
754 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
755 )
757 statement = statement.order_by(Message.id.asc()).limit(pagination + 1)
758 res = session.execute(statement).all()
760 no_more = len(res) <= pagination
762 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO
764 return requests_pb2.GetHostRequestUpdatesRes(
765 no_more=no_more,
766 updates=[
767 requests_pb2.HostRequestUpdate(
768 host_request_id=result.host_request_id,
769 status=hostrequeststatus2api[result.host_request_status],
770 message=message_to_pb(result.Message),
771 )
772 for result in res[:pagination]
773 ],
774 )
776 def MarkLastSeenHostRequest(
777 self, request: requests_pb2.MarkLastSeenHostRequestReq, context: CouchersContext, session: Session
778 ) -> empty_pb2.Empty:
779 host_request = session.execute(
780 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
781 HostRequest.conversation_id == request.host_request_id
782 )
783 ).scalar_one_or_none()
785 if not host_request: 785 ↛ 786line 785 didn't jump to line 786 because the condition on line 785 was never true
786 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
788 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 788 ↛ 789line 788 didn't jump to line 789 because the condition on line 788 was never true
789 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
791 if host_request.surfer_user_id == context.user_id: 791 ↛ 792line 791 didn't jump to line 792 because the condition on line 791 was never true
792 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id:
793 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
794 host_request.surfer_last_seen_message_id = request.last_seen_message_id
795 else:
796 if not host_request.host_last_seen_message_id <= request.last_seen_message_id:
797 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
798 host_request.host_last_seen_message_id = request.last_seen_message_id
800 session.commit()
801 return empty_pb2.Empty()
803 def SetHostRequestArchiveStatus(
804 self, request: requests_pb2.SetHostRequestArchiveStatusReq, context: CouchersContext, session: Session
805 ) -> requests_pb2.SetHostRequestArchiveStatusRes:
806 host_request = session.execute(
807 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False)
808 .where(HostRequest.conversation_id == request.host_request_id)
809 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id))
810 ).scalar_one_or_none()
812 if not host_request: 812 ↛ 813line 812 didn't jump to line 813 because the condition on line 812 was never true
813 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
815 if context.user_id == host_request.surfer_user_id: 815 ↛ 818line 815 didn't jump to line 818 because the condition on line 815 was always true
816 host_request.is_surfer_archived = request.is_archived
817 else:
818 host_request.is_host_archived = request.is_archived
820 return requests_pb2.SetHostRequestArchiveStatusRes(
821 host_request_id=host_request.conversation_id,
822 is_archived=request.is_archived,
823 )
825 def GetResponseRate(
826 self, request: requests_pb2.GetResponseRateReq, context: CouchersContext, session: Session
827 ) -> requests_pb2.GetResponseRateRes:
828 user_res = session.execute(
829 select(User.id, UserResponseRate)
830 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id)
831 .where(users_visible(context, User))
832 .where(User.id == request.user_id)
833 ).one_or_none()
835 # if user doesn't exist, return None
836 if not user_res:
837 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
839 user, response_rates = user_res
840 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates)) # type: ignore[arg-type]
842 def SendHostRequestFeedback(
843 self, request: requests_pb2.SendHostRequestFeedbackReq, context: CouchersContext, session: Session
844 ) -> empty_pb2.Empty:
845 host_request = session.execute(
846 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False)
847 .where(HostRequest.conversation_id == request.host_request_id)
848 .where(HostRequest.host_user_id == context.user_id)
849 ).scalar_one_or_none()
851 if not host_request:
852 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
854 feedback = session.execute(
855 select(HostRequestFeedback)
856 .where(HostRequestFeedback.host_request_id == host_request.conversation_id)
857 .where(HostRequestFeedback.from_user_id == context.user_id)
858 ).scalar_one_or_none()
860 if feedback:
861 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback")
863 session.add(
864 HostRequestFeedback(
865 host_request_id=host_request.conversation_id,
866 from_user_id=host_request.host_user_id,
867 to_user_id=host_request.surfer_user_id,
868 request_quality=hostrequestquality2sql.get(request.host_request_quality),
869 decline_reason=request.decline_reason,
870 )
871 )
873 return empty_pb2.Empty()