Coverage for src / couchers / servicers / requests.py: 91%
285 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-14 09:03 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-14 09:03 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy import exists, select
7from sqlalchemy.orm import Session, aliased
8from sqlalchemy.sql import and_, func, or_
10from couchers.constants import HOST_REQUEST_MIN_LENGTH_UTF16
11from couchers.context import CouchersContext
12from couchers.materialized_views import UserResponseRate
13from couchers.metrics import (
14 account_age_on_host_request_create_histogram,
15 host_request_first_response_histogram,
16 host_request_responses_counter,
17 host_requests_sent_counter,
18 sent_messages_counter,
19)
20from couchers.models import (
21 Conversation,
22 HostRequest,
23 HostRequestFeedback,
24 HostRequestQuality,
25 HostRequestStatus,
26 Message,
27 MessageType,
28 ModerationObjectType,
29 RateLimitAction,
30 User,
31)
32from couchers.models.notifications import NotificationTopicAction
33from couchers.moderation.utils import create_moderation
34from couchers.notifications.notify import notify
35from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc
36from couchers.rate_limits.check import process_rate_limits_and_check_abort
37from couchers.rate_limits.definitions import RATE_LIMIT_HOURS
38from couchers.servicers.api import response_rate_to_pb, user_model_to_pb
39from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible
40from couchers.utils import (
41 Timestamp_from_datetime,
42 date_to_api,
43 get_coordinates,
44 now,
45 parse_date,
46 today_in_timezone,
47)
49logger = logging.getLogger(__name__)
51DEFAULT_PAGINATION_LENGTH = 10
52MAX_PAGE_SIZE = 50
55hostrequeststatus2api = {
56 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING,
57 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED,
58 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED,
59 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED,
60 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED,
61}
63hostrequestquality2sql = {
64 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality,
65 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality,
66 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality,
67}
70def message_to_pb(message: Message) -> conversations_pb2.Message:
71 """
72 Turns the given message to a protocol buffer
73 """
74 if message.is_normal_message:
75 return conversations_pb2.Message(
76 message_id=message.id,
77 author_user_id=message.author_id,
78 time=Timestamp_from_datetime(message.time),
79 text=conversations_pb2.MessageContentText(text=message.text),
80 )
81 else:
82 return conversations_pb2.Message(
83 message_id=message.id,
84 author_user_id=message.author_id,
85 time=Timestamp_from_datetime(message.time),
86 chat_created=(
87 conversations_pb2.MessageContentChatCreated()
88 if message.message_type == MessageType.chat_created
89 else None
90 ),
91 host_request_status_changed=(
92 conversations_pb2.MessageContentHostRequestStatusChanged(
93 status=hostrequeststatus2api[message.host_request_status_target] # type: ignore[index]
94 )
95 if message.message_type == MessageType.host_request_status_changed
96 else None
97 ),
98 )
101def host_request_to_pb(
102 host_request: HostRequest, session: Session, context: CouchersContext
103) -> requests_pb2.HostRequest:
104 initial_message = session.execute(
105 select(Message)
106 .where(Message.conversation_id == host_request.conversation_id)
107 .order_by(Message.id.asc())
108 .limit(1)
109 ).scalar_one()
111 latest_message = session.execute(
112 select(Message)
113 .where(Message.conversation_id == host_request.conversation_id)
114 .order_by(Message.id.desc())
115 .limit(1)
116 ).scalar_one()
118 lat, lng = get_coordinates(host_request.hosting_location)
120 need_feedback = False
121 if context.user_id == host_request.host_user_id and host_request.status == HostRequestStatus.rejected:
122 need_feedback = not session.execute(
123 select(
124 exists().where(
125 HostRequestFeedback.from_user_id == context.user_id,
126 HostRequestFeedback.host_request_id == host_request.conversation_id,
127 )
128 )
129 ).scalar_one()
131 return requests_pb2.HostRequest(
132 host_request_id=host_request.conversation_id,
133 surfer_user_id=host_request.surfer_user_id,
134 host_user_id=host_request.host_user_id,
135 status=hostrequeststatus2api[host_request.status],
136 created=Timestamp_from_datetime(initial_message.time),
137 from_date=date_to_api(host_request.from_date),
138 to_date=date_to_api(host_request.to_date),
139 last_seen_message_id=(
140 host_request.surfer_last_seen_message_id
141 if context.user_id == host_request.surfer_user_id
142 else host_request.host_last_seen_message_id
143 ),
144 latest_message=message_to_pb(latest_message),
145 hosting_city=host_request.hosting_city,
146 hosting_lat=lat,
147 hosting_lng=lng,
148 hosting_radius=host_request.hosting_radius,
149 need_host_request_feedback=need_feedback,
150 )
153def _possibly_observe_first_response_time(
154 session: Session, host_request: HostRequest, user_id: int, response_type: str
155) -> None:
156 # if this is the first response then there's nothing by this user yet
157 assert host_request.host_user_id == user_id
159 number_messages_by_host = session.execute(
160 select(func.count())
161 .where(Message.conversation_id == host_request.conversation_id)
162 .where(Message.author_id == user_id)
163 ).scalar_one_or_none()
165 if number_messages_by_host == 0:
166 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one()
167 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one()
168 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe(
169 (now() - host_request.conversation.created).total_seconds()
170 )
173def _is_host_request_long_enough(text: str) -> bool:
174 # Python's len(str) does not match Javascript's string.length.
175 # e.g. len("é") == 2 but "é".length == 1.
176 # To match the frontend's validation, measure the string in utf16 code units.
177 text_length_utf16 = len(text.encode("utf-16-le")) // 2 # utf-16-le does not include a prefix BOM code unit.
178 return text_length_utf16 >= HOST_REQUEST_MIN_LENGTH_UTF16
181class Requests(requests_pb2_grpc.RequestsServicer):
182 def CreateHostRequest(
183 self, request: requests_pb2.CreateHostRequestReq, context: CouchersContext, session: Session
184 ) -> requests_pb2.CreateHostRequestRes:
185 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
186 if not user.has_completed_profile:
187 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request")
189 if request.host_user_id == context.user_id:
190 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self")
192 # just to check host exists and is visible
193 host = session.execute(
194 select(User).where(users_visible(context, User)).where(User.id == request.host_user_id)
195 ).scalar_one_or_none()
196 if not host:
197 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
199 from_date = parse_date(request.from_date)
200 to_date = parse_date(request.to_date)
202 if not from_date or not to_date:
203 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date")
205 today = today_in_timezone(host.timezone)
207 # request starts from the past
208 if from_date < today:
209 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today")
211 # from_date is not >= to_date
212 if from_date >= to_date:
213 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to")
215 # No need to check today > to_date
217 if from_date - today > timedelta(days=365):
218 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year")
220 if to_date - from_date > timedelta(days=365):
221 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year")
223 # Check minimum length
224 if not _is_host_request_long_enough(request.text):
225 context.abort_with_error_code(
226 grpc.StatusCode.INVALID_ARGUMENT,
227 "host_request_too_short",
228 substitutions={"chars": str(HOST_REQUEST_MIN_LENGTH_UTF16)},
229 )
231 # Check if user has been sending host requests excessively
232 if process_rate_limits_and_check_abort(
233 session=session, user_id=context.user_id, action=RateLimitAction.host_request
234 ):
235 context.abort_with_error_code(
236 grpc.StatusCode.RESOURCE_EXHAUSTED,
237 "host_request_rate_limit",
238 substitutions={"hours": str(RATE_LIMIT_HOURS)},
239 )
241 conversation = Conversation()
242 session.add(conversation)
243 session.flush()
245 session.add(
246 Message(
247 conversation_id=conversation.id,
248 author_id=context.user_id,
249 message_type=MessageType.chat_created,
250 )
251 )
253 message = Message(
254 conversation_id=conversation.id,
255 author_id=context.user_id,
256 text=request.text,
257 message_type=MessageType.text,
258 )
259 session.add(message)
260 session.flush()
262 # Create moderation state for UMS (starts as SHADOWED)
263 moderation_state = create_moderation(
264 session=session,
265 object_type=ModerationObjectType.HOST_REQUEST,
266 object_id=conversation.id,
267 creator_user_id=context.user_id,
268 )
270 host_request = HostRequest(
271 conversation_id=conversation.id,
272 surfer_user_id=context.user_id,
273 host_user_id=host.id,
274 moderation_state_id=moderation_state.id,
275 from_date=from_date,
276 to_date=to_date,
277 status=HostRequestStatus.pending,
278 surfer_last_seen_message_id=message.id,
279 # TODO: tz
280 # timezone=host.timezone,
281 hosting_city=host.city,
282 hosting_location=host.geom,
283 hosting_radius=host.geom_radius,
284 )
285 session.add(host_request)
286 session.flush()
288 notify(
289 session,
290 user_id=host_request.host_user_id,
291 topic_action=NotificationTopicAction.host_request__create,
292 key=str(host_request.conversation_id),
293 data=notification_data_pb2.HostRequestCreate(
294 host_request=host_request_to_pb(host_request, session, context),
295 surfer=user_model_to_pb(host_request.surfer, session, context),
296 text=request.text,
297 ),
298 moderation_state_id=moderation_state.id,
299 )
301 host_requests_sent_counter.labels(user.gender, host.gender).inc()
302 sent_messages_counter.labels(user.gender, "host request send").inc()
303 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe(
304 (now() - user.joined).total_seconds()
305 )
307 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id)
309 def GetHostRequest(
310 self, request: requests_pb2.GetHostRequestReq, context: CouchersContext, session: Session
311 ) -> requests_pb2.HostRequest:
312 host_request = session.execute(
313 where_moderated_content_visible(
314 where_users_column_visible(
315 where_users_column_visible(
316 select(HostRequest),
317 context,
318 HostRequest.surfer_user_id,
319 ),
320 context,
321 HostRequest.host_user_id,
322 ),
323 context,
324 HostRequest,
325 is_list_operation=False,
326 )
327 .where(HostRequest.conversation_id == request.host_request_id)
328 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id))
329 ).scalar_one_or_none()
331 if not host_request:
332 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
334 return host_request_to_pb(host_request, session, context)
336 def ListHostRequests(
337 self, request: requests_pb2.ListHostRequestsReq, context: CouchersContext, session: Session
338 ) -> requests_pb2.ListHostRequestsRes:
339 if request.only_sent and request.only_received: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true
340 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
342 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
343 pagination = min(pagination, MAX_PAGE_SIZE)
345 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have
346 # none as message_2.id. So just filter for these to get the highest messages only.
347 # See https://stackoverflow.com/a/27802817/6115336
348 message_2 = aliased(Message)
349 statement = where_moderated_content_visible(
350 where_users_column_visible(
351 where_users_column_visible(
352 select(Message, HostRequest, Conversation)
353 .outerjoin(
354 message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id)
355 )
356 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
357 .join(Conversation, Conversation.id == HostRequest.conversation_id),
358 context,
359 HostRequest.surfer_user_id,
360 ),
361 context,
362 HostRequest.host_user_id,
363 ),
364 context,
365 HostRequest,
366 is_list_operation=True,
367 ).where(message_2.id == None)
369 if request.last_request_id != 0:
370 statement = statement.where(Message.id < request.last_request_id)
371 if request.only_sent:
372 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
373 elif request.only_received:
374 statement = statement.where(HostRequest.host_user_id == context.user_id)
375 elif request.HasField("only_archived"):
376 statement = statement.where(
377 or_(
378 and_(
379 HostRequest.surfer_user_id == context.user_id,
380 HostRequest.is_surfer_archived == request.only_archived,
381 ),
382 and_(
383 HostRequest.host_user_id == context.user_id,
384 HostRequest.is_host_archived == request.only_archived,
385 ),
386 )
387 )
388 else:
389 statement = statement.where(
390 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
391 )
393 # TODO: I considered having the latest control message be the single source of truth for
394 # the HostRequest.status, but decided against it because of this filter.
395 # Another possibility is to filter in the python instead of SQL, but that's slower
396 if request.only_active:
397 statement = statement.where(
398 or_(
399 HostRequest.status == HostRequestStatus.pending,
400 HostRequest.status == HostRequestStatus.accepted,
401 HostRequest.status == HostRequestStatus.confirmed,
402 )
403 )
404 statement = statement.where(HostRequest.end_time <= func.now())
406 statement = statement.order_by(Message.id.desc()).limit(pagination + 1)
407 results = session.execute(statement).all()
409 host_requests = []
410 for result in results[:pagination]:
411 lat, lng = get_coordinates(result.HostRequest.hosting_location)
412 host_requests.append(
413 requests_pb2.HostRequest(
414 host_request_id=result.HostRequest.conversation_id,
415 surfer_user_id=result.HostRequest.surfer_user_id,
416 host_user_id=result.HostRequest.host_user_id,
417 status=hostrequeststatus2api[result.HostRequest.status],
418 created=Timestamp_from_datetime(result.Conversation.created),
419 from_date=date_to_api(result.HostRequest.from_date),
420 to_date=date_to_api(result.HostRequest.to_date),
421 last_seen_message_id=(
422 result.HostRequest.surfer_last_seen_message_id
423 if context.user_id == result.HostRequest.surfer_user_id
424 else result.HostRequest.host_last_seen_message_id
425 ),
426 latest_message=message_to_pb(result.Message),
427 hosting_city=result.HostRequest.hosting_city,
428 hosting_lat=lat,
429 hosting_lng=lng,
430 hosting_radius=result.HostRequest.hosting_radius,
431 )
432 )
434 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO
435 no_more = len(results) <= pagination
437 return requests_pb2.ListHostRequestsRes(
438 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests
439 )
441 def RespondHostRequest(
442 self, request: requests_pb2.RespondHostRequestReq, context: CouchersContext, session: Session
443 ) -> empty_pb2.Empty:
444 def count_host_response(other_user_id: int, response_type: str) -> None:
445 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
446 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one()
447 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc()
448 sent_messages_counter.labels(user_gender, "host request response").inc()
450 host_request = session.execute(
451 where_moderated_content_visible(
452 where_users_column_visible(
453 where_users_column_visible(
454 select(HostRequest),
455 context,
456 HostRequest.surfer_user_id,
457 ),
458 context,
459 HostRequest.host_user_id,
460 ),
461 context,
462 HostRequest,
463 is_list_operation=False,
464 ).where(HostRequest.conversation_id == request.host_request_id)
465 ).scalar_one_or_none()
467 if not host_request:
468 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
470 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
471 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
473 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING:
474 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
476 if host_request.end_time < now(): 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true
477 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past")
479 control_message = Message(
480 message_type=MessageType.host_request_status_changed,
481 conversation_id=host_request.conversation_id,
482 author_id=context.user_id,
483 )
485 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED:
486 # only host can accept
487 if context.user_id != host_request.host_user_id:
488 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host")
489 # can't accept a cancelled or confirmed request (only reject), or already accepted
490 if ( 490 ↛ 495line 490 didn't jump to line 495 because the condition on line 490 was never true
491 host_request.status == HostRequestStatus.cancelled
492 or host_request.status == HostRequestStatus.confirmed
493 or host_request.status == HostRequestStatus.accepted
494 ):
495 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
496 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted")
497 control_message.host_request_status_target = HostRequestStatus.accepted
498 host_request.status = HostRequestStatus.accepted
499 session.flush()
501 notify(
502 session,
503 user_id=host_request.surfer_user_id,
504 topic_action=NotificationTopicAction.host_request__accept,
505 key=str(host_request.conversation_id),
506 data=notification_data_pb2.HostRequestAccept(
507 host_request=host_request_to_pb(host_request, session, context),
508 host=user_model_to_pb(host_request.host, session, context),
509 ),
510 moderation_state_id=host_request.moderation_state_id,
511 )
513 count_host_response(host_request.surfer_user_id, "accepted")
515 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED:
516 # only host can reject
517 if context.user_id != host_request.host_user_id: 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true
518 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
519 # can't reject a cancelled or already rejected request
520 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true
521 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
522 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected")
523 control_message.host_request_status_target = HostRequestStatus.rejected
524 host_request.status = HostRequestStatus.rejected
525 session.flush()
527 notify(
528 session,
529 user_id=host_request.surfer_user_id,
530 topic_action=NotificationTopicAction.host_request__reject,
531 key=str(host_request.conversation_id),
532 data=notification_data_pb2.HostRequestReject(
533 host_request=host_request_to_pb(host_request, session, context),
534 host=user_model_to_pb(host_request.host, session, context),
535 ),
536 moderation_state_id=host_request.moderation_state_id,
537 )
539 count_host_response(host_request.surfer_user_id, "rejected")
541 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED:
542 # only surfer can confirm
543 if context.user_id != host_request.surfer_user_id:
544 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
545 # can only confirm an accepted request
546 if host_request.status != HostRequestStatus.accepted:
547 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
548 control_message.host_request_status_target = HostRequestStatus.confirmed
549 host_request.status = HostRequestStatus.confirmed
550 session.flush()
552 notify(
553 session,
554 user_id=host_request.host_user_id,
555 topic_action=NotificationTopicAction.host_request__confirm,
556 key=str(host_request.conversation_id),
557 data=notification_data_pb2.HostRequestConfirm(
558 host_request=host_request_to_pb(host_request, session, context),
559 surfer=user_model_to_pb(host_request.surfer, session, context),
560 ),
561 moderation_state_id=host_request.moderation_state_id,
562 )
564 count_host_response(host_request.host_user_id, "confirmed")
566 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED:
567 # only surfer can cancel
568 if context.user_id != host_request.surfer_user_id:
569 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
570 # can't' cancel an already cancelled or rejected request
571 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 571 ↛ 572line 571 didn't jump to line 572 because the condition on line 571 was never true
572 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
573 control_message.host_request_status_target = HostRequestStatus.cancelled
574 host_request.status = HostRequestStatus.cancelled
575 session.flush()
577 notify(
578 session,
579 user_id=host_request.host_user_id,
580 topic_action=NotificationTopicAction.host_request__cancel,
581 key=str(host_request.conversation_id),
582 data=notification_data_pb2.HostRequestCancel(
583 host_request=host_request_to_pb(host_request, session, context),
584 surfer=user_model_to_pb(host_request.surfer, session, context),
585 ),
586 moderation_state_id=host_request.moderation_state_id,
587 )
589 count_host_response(host_request.host_user_id, "cancelled")
591 session.add(control_message)
593 if request.text:
594 latest_message = Message(
595 conversation_id=host_request.conversation_id,
596 text=request.text,
597 author_id=context.user_id,
598 message_type=MessageType.text,
599 )
601 session.add(latest_message)
602 else:
603 latest_message = control_message
605 session.flush()
607 if host_request.surfer_user_id == context.user_id:
608 host_request.surfer_last_seen_message_id = latest_message.id
609 else:
610 host_request.host_last_seen_message_id = latest_message.id
611 session.commit()
613 return empty_pb2.Empty()
615 def GetHostRequestMessages(
616 self, request: requests_pb2.GetHostRequestMessagesReq, context: CouchersContext, session: Session
617 ) -> requests_pb2.GetHostRequestMessagesRes:
618 host_request = session.execute(
619 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
620 HostRequest.conversation_id == request.host_request_id
621 )
622 ).scalar_one_or_none()
624 if not host_request: 624 ↛ 625line 624 didn't jump to line 625 because the condition on line 624 was never true
625 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
627 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 627 ↛ 628line 627 didn't jump to line 628 because the condition on line 627 was never true
628 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
630 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
631 pagination = min(pagination, MAX_PAGE_SIZE)
633 messages = (
634 session.execute(
635 select(Message)
636 .where(Message.conversation_id == host_request.conversation_id)
637 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
638 .order_by(Message.id.desc())
639 .limit(pagination + 1)
640 )
641 .scalars()
642 .all()
643 )
645 no_more = len(messages) <= pagination
647 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0
649 return requests_pb2.GetHostRequestMessagesRes(
650 last_message_id=last_message_id,
651 no_more=no_more,
652 messages=[message_to_pb(message) for message in messages[:pagination]],
653 )
655 def SendHostRequestMessage(
656 self, request: requests_pb2.SendHostRequestMessageReq, context: CouchersContext, session: Session
657 ) -> empty_pb2.Empty:
658 if request.text == "":
659 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
660 host_request = session.execute(
661 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
662 HostRequest.conversation_id == request.host_request_id
663 )
664 ).scalar_one_or_none()
666 if not host_request:
667 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
669 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
670 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
672 if host_request.host_user_id == context.user_id:
673 _possibly_observe_first_response_time(session, host_request, context.user_id, "message")
675 message = Message(
676 conversation_id=host_request.conversation_id,
677 author_id=context.user_id,
678 message_type=MessageType.text,
679 text=request.text,
680 )
682 session.add(message)
683 session.flush()
685 if host_request.surfer_user_id == context.user_id:
686 host_request.surfer_last_seen_message_id = message.id
688 notify(
689 session,
690 user_id=host_request.host_user_id,
691 topic_action=NotificationTopicAction.host_request__message,
692 key=str(host_request.conversation_id),
693 data=notification_data_pb2.HostRequestMessage(
694 host_request=host_request_to_pb(host_request, session, context),
695 user=user_model_to_pb(host_request.surfer, session, context),
696 text=request.text,
697 am_host=True,
698 ),
699 moderation_state_id=host_request.moderation_state_id,
700 )
702 else:
703 host_request.host_last_seen_message_id = message.id
705 notify(
706 session,
707 user_id=host_request.surfer_user_id,
708 topic_action=NotificationTopicAction.host_request__message,
709 key=str(host_request.conversation_id),
710 data=notification_data_pb2.HostRequestMessage(
711 host_request=host_request_to_pb(host_request, session, context),
712 user=user_model_to_pb(host_request.host, session, context),
713 text=request.text,
714 am_host=False,
715 ),
716 moderation_state_id=host_request.moderation_state_id,
717 )
719 session.commit()
721 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
722 sent_messages_counter.labels(user_gender, "host request").inc()
724 return empty_pb2.Empty()
726 def GetHostRequestUpdates(
727 self, request: requests_pb2.GetHostRequestUpdatesReq, context: CouchersContext, session: Session
728 ) -> requests_pb2.GetHostRequestUpdatesRes:
729 if request.only_sent and request.only_received: 729 ↛ 730line 729 didn't jump to line 730 because the condition on line 729 was never true
730 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
732 if request.newest_message_id == 0:
733 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
735 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 735 ↛ 736line 735 didn't jump to line 736 because the condition on line 735 was never true
736 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
738 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
739 pagination = min(pagination, MAX_PAGE_SIZE)
741 statement = where_moderated_content_visible(
742 select(
743 Message,
744 HostRequest.status.label("host_request_status"),
745 HostRequest.conversation_id.label("host_request_id"),
746 )
747 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
748 .where(Message.id > request.newest_message_id),
749 context,
750 HostRequest,
751 is_list_operation=False,
752 )
754 if request.only_sent: 754 ↛ 755line 754 didn't jump to line 755 because the condition on line 754 was never true
755 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
756 elif request.only_received: 756 ↛ 757line 756 didn't jump to line 757 because the condition on line 756 was never true
757 statement = statement.where(HostRequest.host_user_id == context.user_id)
758 else:
759 statement = statement.where(
760 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
761 )
763 statement = statement.order_by(Message.id.asc()).limit(pagination + 1)
764 res = session.execute(statement).all()
766 no_more = len(res) <= pagination
768 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO
770 return requests_pb2.GetHostRequestUpdatesRes(
771 no_more=no_more,
772 updates=[
773 requests_pb2.HostRequestUpdate(
774 host_request_id=result.host_request_id,
775 status=hostrequeststatus2api[result.host_request_status],
776 message=message_to_pb(result.Message),
777 )
778 for result in res[:pagination]
779 ],
780 )
782 def MarkLastSeenHostRequest(
783 self, request: requests_pb2.MarkLastSeenHostRequestReq, context: CouchersContext, session: Session
784 ) -> empty_pb2.Empty:
785 host_request = session.execute(
786 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
787 HostRequest.conversation_id == request.host_request_id
788 )
789 ).scalar_one_or_none()
791 if not host_request: 791 ↛ 792line 791 didn't jump to line 792 because the condition on line 791 was never true
792 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
794 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 794 ↛ 795line 794 didn't jump to line 795 because the condition on line 794 was never true
795 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
797 if host_request.surfer_user_id == context.user_id: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true
798 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id:
799 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
800 host_request.surfer_last_seen_message_id = request.last_seen_message_id
801 else:
802 if not host_request.host_last_seen_message_id <= request.last_seen_message_id:
803 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
804 host_request.host_last_seen_message_id = request.last_seen_message_id
806 session.commit()
807 return empty_pb2.Empty()
809 def SetHostRequestArchiveStatus(
810 self, request: requests_pb2.SetHostRequestArchiveStatusReq, context: CouchersContext, session: Session
811 ) -> requests_pb2.SetHostRequestArchiveStatusRes:
812 host_request = session.execute(
813 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False)
814 .where(HostRequest.conversation_id == request.host_request_id)
815 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id))
816 ).scalar_one_or_none()
818 if not host_request: 818 ↛ 819line 818 didn't jump to line 819 because the condition on line 818 was never true
819 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
821 if context.user_id == host_request.surfer_user_id: 821 ↛ 824line 821 didn't jump to line 824 because the condition on line 821 was always true
822 host_request.is_surfer_archived = request.is_archived
823 else:
824 host_request.is_host_archived = request.is_archived
826 return requests_pb2.SetHostRequestArchiveStatusRes(
827 host_request_id=host_request.conversation_id,
828 is_archived=request.is_archived,
829 )
831 def GetResponseRate(
832 self, request: requests_pb2.GetResponseRateReq, context: CouchersContext, session: Session
833 ) -> requests_pb2.GetResponseRateRes:
834 user_res = session.execute(
835 select(User.id, UserResponseRate)
836 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id)
837 .where(users_visible(context, User))
838 .where(User.id == request.user_id)
839 ).one_or_none()
841 # if user doesn't exist, return None
842 if not user_res:
843 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
845 user, response_rates = user_res
846 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates)) # type: ignore[arg-type]
848 def SendHostRequestFeedback(
849 self, request: requests_pb2.SendHostRequestFeedbackReq, context: CouchersContext, session: Session
850 ) -> empty_pb2.Empty:
851 host_request = session.execute(
852 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False)
853 .where(HostRequest.conversation_id == request.host_request_id)
854 .where(HostRequest.host_user_id == context.user_id)
855 ).scalar_one_or_none()
857 if not host_request:
858 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
860 feedback = session.execute(
861 select(HostRequestFeedback)
862 .where(HostRequestFeedback.host_request_id == host_request.conversation_id)
863 .where(HostRequestFeedback.from_user_id == context.user_id)
864 ).scalar_one_or_none()
866 if feedback:
867 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback")
869 session.add(
870 HostRequestFeedback(
871 host_request_id=host_request.conversation_id,
872 from_user_id=host_request.host_user_id,
873 to_user_id=host_request.surfer_user_id,
874 request_quality=hostrequestquality2sql.get(request.host_request_quality),
875 decline_reason=request.decline_reason,
876 )
877 )
879 return empty_pb2.Empty()