Coverage for src / couchers / servicers / requests.py: 91%
286 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-28 06:27 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-28 06:27 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy import exists, select
7from sqlalchemy.orm import Session, aliased
8from sqlalchemy.sql import and_, func, or_
10from couchers.constants import HOST_REQUEST_MIN_LENGTH_UTF16
11from couchers.context import CouchersContext
12from couchers.helpers.completed_profile import has_completed_profile
13from couchers.materialized_views import UserResponseRate
14from couchers.metrics import (
15 account_age_on_host_request_create_histogram,
16 host_request_first_response_histogram,
17 host_request_responses_counter,
18 host_requests_sent_counter,
19 sent_messages_counter,
20)
21from couchers.models import (
22 Conversation,
23 HostRequest,
24 HostRequestFeedback,
25 HostRequestQuality,
26 HostRequestStatus,
27 Message,
28 MessageType,
29 ModerationObjectType,
30 RateLimitAction,
31 User,
32)
33from couchers.models.notifications import NotificationTopicAction
34from couchers.moderation.utils import create_moderation
35from couchers.notifications.notify import notify
36from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc
37from couchers.rate_limits.check import process_rate_limits_and_check_abort
38from couchers.rate_limits.definitions import RATE_LIMIT_HOURS
39from couchers.servicers.api import response_rate_to_pb, user_model_to_pb
40from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible
41from couchers.utils import (
42 Timestamp_from_datetime,
43 date_to_api,
44 get_coordinates,
45 now,
46 parse_date,
47 today_in_timezone,
48)
50logger = logging.getLogger(__name__)
52DEFAULT_PAGINATION_LENGTH = 10
53MAX_PAGE_SIZE = 50
56hostrequeststatus2api = {
57 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING,
58 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED,
59 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED,
60 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED,
61 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED,
62}
64hostrequestquality2sql = {
65 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality,
66 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality,
67 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality,
68}
71def message_to_pb(message: Message) -> conversations_pb2.Message:
72 """
73 Turns the given message to a protocol buffer
74 """
75 if message.is_normal_message:
76 return conversations_pb2.Message(
77 message_id=message.id,
78 author_user_id=message.author_id,
79 time=Timestamp_from_datetime(message.time),
80 text=conversations_pb2.MessageContentText(text=message.text),
81 )
82 else:
83 return conversations_pb2.Message(
84 message_id=message.id,
85 author_user_id=message.author_id,
86 time=Timestamp_from_datetime(message.time),
87 chat_created=(
88 conversations_pb2.MessageContentChatCreated()
89 if message.message_type == MessageType.chat_created
90 else None
91 ),
92 host_request_status_changed=(
93 conversations_pb2.MessageContentHostRequestStatusChanged(
94 status=hostrequeststatus2api[message.host_request_status_target] # type: ignore[index]
95 )
96 if message.message_type == MessageType.host_request_status_changed
97 else None
98 ),
99 )
102def host_request_to_pb(
103 host_request: HostRequest, session: Session, context: CouchersContext
104) -> requests_pb2.HostRequest:
105 initial_message = session.execute(
106 select(Message)
107 .where(Message.conversation_id == host_request.conversation_id)
108 .order_by(Message.id.asc())
109 .limit(1)
110 ).scalar_one()
112 latest_message = session.execute(
113 select(Message)
114 .where(Message.conversation_id == host_request.conversation_id)
115 .order_by(Message.id.desc())
116 .limit(1)
117 ).scalar_one()
119 lat, lng = get_coordinates(host_request.hosting_location)
121 need_feedback = False
122 if context.user_id == host_request.host_user_id and host_request.status == HostRequestStatus.rejected:
123 need_feedback = not session.execute(
124 select(
125 exists().where(
126 HostRequestFeedback.from_user_id == context.user_id,
127 HostRequestFeedback.host_request_id == host_request.conversation_id,
128 )
129 )
130 ).scalar_one()
132 return requests_pb2.HostRequest(
133 host_request_id=host_request.conversation_id,
134 surfer_user_id=host_request.surfer_user_id,
135 host_user_id=host_request.host_user_id,
136 status=hostrequeststatus2api[host_request.status],
137 created=Timestamp_from_datetime(initial_message.time),
138 from_date=date_to_api(host_request.from_date),
139 to_date=date_to_api(host_request.to_date),
140 last_seen_message_id=(
141 host_request.surfer_last_seen_message_id
142 if context.user_id == host_request.surfer_user_id
143 else host_request.host_last_seen_message_id
144 ),
145 latest_message=message_to_pb(latest_message),
146 hosting_city=host_request.hosting_city,
147 hosting_lat=lat,
148 hosting_lng=lng,
149 hosting_radius=host_request.hosting_radius,
150 need_host_request_feedback=need_feedback,
151 )
154def _possibly_observe_first_response_time(
155 session: Session, host_request: HostRequest, user_id: int, response_type: str
156) -> None:
157 # if this is the first response then there's nothing by this user yet
158 assert host_request.host_user_id == user_id
160 number_messages_by_host = session.execute(
161 select(func.count())
162 .where(Message.conversation_id == host_request.conversation_id)
163 .where(Message.author_id == user_id)
164 ).scalar_one_or_none()
166 if number_messages_by_host == 0:
167 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one()
168 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one()
169 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe(
170 (now() - host_request.conversation.created).total_seconds()
171 )
174def _is_host_request_long_enough(text: str) -> bool:
175 # Python's len(str) does not match Javascript's string.length.
176 # e.g. len("é") == 2 but "é".length == 1.
177 # To match the frontend's validation, measure the string in utf16 code units.
178 text_length_utf16 = len(text.encode("utf-16-le")) // 2 # utf-16-le does not include a prefix BOM code unit.
179 return text_length_utf16 >= HOST_REQUEST_MIN_LENGTH_UTF16
182class Requests(requests_pb2_grpc.RequestsServicer):
183 def CreateHostRequest(
184 self, request: requests_pb2.CreateHostRequestReq, context: CouchersContext, session: Session
185 ) -> requests_pb2.CreateHostRequestRes:
186 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
187 if not has_completed_profile(session, user):
188 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request")
190 if request.host_user_id == context.user_id:
191 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self")
193 # just to check host exists and is visible
194 host = session.execute(
195 select(User).where(users_visible(context, User)).where(User.id == request.host_user_id)
196 ).scalar_one_or_none()
197 if not host:
198 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
200 from_date = parse_date(request.from_date)
201 to_date = parse_date(request.to_date)
203 if not from_date or not to_date:
204 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date")
206 today = today_in_timezone(host.timezone)
208 # request starts from the past
209 if from_date < today:
210 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today")
212 # from_date is not >= to_date
213 if from_date >= to_date:
214 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to")
216 # No need to check today > to_date
218 if from_date - today > timedelta(days=365):
219 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year")
221 if to_date - from_date > timedelta(days=365):
222 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year")
224 # Check minimum length
225 if not _is_host_request_long_enough(request.text):
226 context.abort_with_error_code(
227 grpc.StatusCode.INVALID_ARGUMENT,
228 "host_request_too_short",
229 substitutions={"chars": str(HOST_REQUEST_MIN_LENGTH_UTF16)},
230 )
232 # Check if user has been sending host requests excessively
233 if process_rate_limits_and_check_abort(
234 session=session, user_id=context.user_id, action=RateLimitAction.host_request
235 ):
236 context.abort_with_error_code(
237 grpc.StatusCode.RESOURCE_EXHAUSTED,
238 "host_request_rate_limit",
239 substitutions={"hours": str(RATE_LIMIT_HOURS)},
240 )
242 conversation = Conversation()
243 session.add(conversation)
244 session.flush()
246 session.add(
247 Message(
248 conversation_id=conversation.id,
249 author_id=context.user_id,
250 message_type=MessageType.chat_created,
251 )
252 )
254 message = Message(
255 conversation_id=conversation.id,
256 author_id=context.user_id,
257 text=request.text,
258 message_type=MessageType.text,
259 )
260 session.add(message)
261 session.flush()
263 # Create moderation state for UMS (starts as SHADOWED)
264 moderation_state = create_moderation(
265 session=session,
266 object_type=ModerationObjectType.HOST_REQUEST,
267 object_id=conversation.id,
268 creator_user_id=context.user_id,
269 )
271 host_request = HostRequest(
272 conversation_id=conversation.id,
273 surfer_user_id=context.user_id,
274 host_user_id=host.id,
275 moderation_state_id=moderation_state.id,
276 from_date=from_date,
277 to_date=to_date,
278 status=HostRequestStatus.pending,
279 surfer_last_seen_message_id=message.id,
280 # TODO: tz
281 # timezone=host.timezone,
282 hosting_city=host.city,
283 hosting_location=host.geom,
284 hosting_radius=host.geom_radius,
285 )
286 session.add(host_request)
287 session.flush()
289 notify(
290 session,
291 user_id=host_request.host_user_id,
292 topic_action=NotificationTopicAction.host_request__create,
293 key=str(host_request.conversation_id),
294 data=notification_data_pb2.HostRequestCreate(
295 host_request=host_request_to_pb(host_request, session, context),
296 surfer=user_model_to_pb(host_request.surfer, session, context),
297 text=request.text,
298 ),
299 moderation_state_id=moderation_state.id,
300 )
302 host_requests_sent_counter.labels(user.gender, host.gender).inc()
303 sent_messages_counter.labels(user.gender, "host request send").inc()
304 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe(
305 (now() - user.joined).total_seconds()
306 )
308 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id)
310 def GetHostRequest(
311 self, request: requests_pb2.GetHostRequestReq, context: CouchersContext, session: Session
312 ) -> requests_pb2.HostRequest:
313 host_request = session.execute(
314 where_moderated_content_visible(
315 where_users_column_visible(
316 where_users_column_visible(
317 select(HostRequest),
318 context,
319 HostRequest.surfer_user_id,
320 ),
321 context,
322 HostRequest.host_user_id,
323 ),
324 context,
325 HostRequest,
326 is_list_operation=False,
327 )
328 .where(HostRequest.conversation_id == request.host_request_id)
329 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id))
330 ).scalar_one_or_none()
332 if not host_request:
333 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
335 return host_request_to_pb(host_request, session, context)
337 def ListHostRequests(
338 self, request: requests_pb2.ListHostRequestsReq, context: CouchersContext, session: Session
339 ) -> requests_pb2.ListHostRequestsRes:
340 if request.only_sent and request.only_received: 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true
341 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
343 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
344 pagination = min(pagination, MAX_PAGE_SIZE)
346 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have
347 # none as message_2.id. So just filter for these to get the highest messages only.
348 # See https://stackoverflow.com/a/27802817/6115336
349 message_2 = aliased(Message)
350 statement = where_moderated_content_visible(
351 where_users_column_visible(
352 where_users_column_visible(
353 select(Message, HostRequest, Conversation)
354 .outerjoin(
355 message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id)
356 )
357 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
358 .join(Conversation, Conversation.id == HostRequest.conversation_id),
359 context,
360 HostRequest.surfer_user_id,
361 ),
362 context,
363 HostRequest.host_user_id,
364 ),
365 context,
366 HostRequest,
367 is_list_operation=True,
368 ).where(message_2.id == None)
370 if request.last_request_id != 0:
371 statement = statement.where(Message.id < request.last_request_id)
372 if request.only_sent:
373 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
374 elif request.only_received:
375 statement = statement.where(HostRequest.host_user_id == context.user_id)
376 elif request.HasField("only_archived"):
377 statement = statement.where(
378 or_(
379 and_(
380 HostRequest.surfer_user_id == context.user_id,
381 HostRequest.is_surfer_archived == request.only_archived,
382 ),
383 and_(
384 HostRequest.host_user_id == context.user_id,
385 HostRequest.is_host_archived == request.only_archived,
386 ),
387 )
388 )
389 else:
390 statement = statement.where(
391 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
392 )
394 # TODO: I considered having the latest control message be the single source of truth for
395 # the HostRequest.status, but decided against it because of this filter.
396 # Another possibility is to filter in the python instead of SQL, but that's slower
397 if request.only_active:
398 statement = statement.where(
399 or_(
400 HostRequest.status == HostRequestStatus.pending,
401 HostRequest.status == HostRequestStatus.accepted,
402 HostRequest.status == HostRequestStatus.confirmed,
403 )
404 )
405 statement = statement.where(HostRequest.end_time <= func.now())
407 statement = statement.order_by(Message.id.desc()).limit(pagination + 1)
408 results = session.execute(statement).all()
410 host_requests = []
411 for result in results[:pagination]:
412 lat, lng = get_coordinates(result.HostRequest.hosting_location)
413 host_requests.append(
414 requests_pb2.HostRequest(
415 host_request_id=result.HostRequest.conversation_id,
416 surfer_user_id=result.HostRequest.surfer_user_id,
417 host_user_id=result.HostRequest.host_user_id,
418 status=hostrequeststatus2api[result.HostRequest.status],
419 created=Timestamp_from_datetime(result.Conversation.created),
420 from_date=date_to_api(result.HostRequest.from_date),
421 to_date=date_to_api(result.HostRequest.to_date),
422 last_seen_message_id=(
423 result.HostRequest.surfer_last_seen_message_id
424 if context.user_id == result.HostRequest.surfer_user_id
425 else result.HostRequest.host_last_seen_message_id
426 ),
427 latest_message=message_to_pb(result.Message),
428 hosting_city=result.HostRequest.hosting_city,
429 hosting_lat=lat,
430 hosting_lng=lng,
431 hosting_radius=result.HostRequest.hosting_radius,
432 )
433 )
435 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO
436 no_more = len(results) <= pagination
438 return requests_pb2.ListHostRequestsRes(
439 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests
440 )
442 def RespondHostRequest(
443 self, request: requests_pb2.RespondHostRequestReq, context: CouchersContext, session: Session
444 ) -> empty_pb2.Empty:
445 def count_host_response(other_user_id: int, response_type: str) -> None:
446 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
447 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one()
448 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc()
449 sent_messages_counter.labels(user_gender, "host request response").inc()
451 host_request = session.execute(
452 where_moderated_content_visible(
453 where_users_column_visible(
454 where_users_column_visible(
455 select(HostRequest),
456 context,
457 HostRequest.surfer_user_id,
458 ),
459 context,
460 HostRequest.host_user_id,
461 ),
462 context,
463 HostRequest,
464 is_list_operation=False,
465 ).where(HostRequest.conversation_id == request.host_request_id)
466 ).scalar_one_or_none()
468 if not host_request:
469 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
471 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
472 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
474 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING:
475 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
477 if host_request.end_time < now(): 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true
478 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past")
480 control_message = Message(
481 message_type=MessageType.host_request_status_changed,
482 conversation_id=host_request.conversation_id,
483 author_id=context.user_id,
484 )
486 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED:
487 # only host can accept
488 if context.user_id != host_request.host_user_id:
489 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host")
490 # can't accept a cancelled or confirmed request (only reject), or already accepted
491 if ( 491 ↛ 496line 491 didn't jump to line 496 because the condition on line 491 was never true
492 host_request.status == HostRequestStatus.cancelled
493 or host_request.status == HostRequestStatus.confirmed
494 or host_request.status == HostRequestStatus.accepted
495 ):
496 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
497 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted")
498 control_message.host_request_status_target = HostRequestStatus.accepted
499 host_request.status = HostRequestStatus.accepted
500 session.flush()
502 notify(
503 session,
504 user_id=host_request.surfer_user_id,
505 topic_action=NotificationTopicAction.host_request__accept,
506 key=str(host_request.conversation_id),
507 data=notification_data_pb2.HostRequestAccept(
508 host_request=host_request_to_pb(host_request, session, context),
509 host=user_model_to_pb(host_request.host, session, context),
510 ),
511 moderation_state_id=host_request.moderation_state_id,
512 )
514 count_host_response(host_request.surfer_user_id, "accepted")
516 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED:
517 # only host can reject
518 if context.user_id != host_request.host_user_id: 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true
519 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
520 # can't reject a cancelled or already rejected request
521 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected: 521 ↛ 522line 521 didn't jump to line 522 because the condition on line 521 was never true
522 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
523 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected")
524 control_message.host_request_status_target = HostRequestStatus.rejected
525 host_request.status = HostRequestStatus.rejected
526 session.flush()
528 notify(
529 session,
530 user_id=host_request.surfer_user_id,
531 topic_action=NotificationTopicAction.host_request__reject,
532 key=str(host_request.conversation_id),
533 data=notification_data_pb2.HostRequestReject(
534 host_request=host_request_to_pb(host_request, session, context),
535 host=user_model_to_pb(host_request.host, session, context),
536 ),
537 moderation_state_id=host_request.moderation_state_id,
538 )
540 count_host_response(host_request.surfer_user_id, "rejected")
542 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED:
543 # only surfer can confirm
544 if context.user_id != host_request.surfer_user_id:
545 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
546 # can only confirm an accepted request
547 if host_request.status != HostRequestStatus.accepted:
548 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
549 control_message.host_request_status_target = HostRequestStatus.confirmed
550 host_request.status = HostRequestStatus.confirmed
551 session.flush()
553 notify(
554 session,
555 user_id=host_request.host_user_id,
556 topic_action=NotificationTopicAction.host_request__confirm,
557 key=str(host_request.conversation_id),
558 data=notification_data_pb2.HostRequestConfirm(
559 host_request=host_request_to_pb(host_request, session, context),
560 surfer=user_model_to_pb(host_request.surfer, session, context),
561 ),
562 moderation_state_id=host_request.moderation_state_id,
563 )
565 count_host_response(host_request.host_user_id, "confirmed")
567 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED:
568 # only surfer can cancel
569 if context.user_id != host_request.surfer_user_id:
570 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
571 # can't' cancel an already cancelled or rejected request
572 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled: 572 ↛ 573line 572 didn't jump to line 573 because the condition on line 572 was never true
573 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
574 control_message.host_request_status_target = HostRequestStatus.cancelled
575 host_request.status = HostRequestStatus.cancelled
576 session.flush()
578 notify(
579 session,
580 user_id=host_request.host_user_id,
581 topic_action=NotificationTopicAction.host_request__cancel,
582 key=str(host_request.conversation_id),
583 data=notification_data_pb2.HostRequestCancel(
584 host_request=host_request_to_pb(host_request, session, context),
585 surfer=user_model_to_pb(host_request.surfer, session, context),
586 ),
587 moderation_state_id=host_request.moderation_state_id,
588 )
590 count_host_response(host_request.host_user_id, "cancelled")
592 session.add(control_message)
594 if request.text:
595 latest_message = Message(
596 conversation_id=host_request.conversation_id,
597 text=request.text,
598 author_id=context.user_id,
599 message_type=MessageType.text,
600 )
602 session.add(latest_message)
603 else:
604 latest_message = control_message
606 session.flush()
608 if host_request.surfer_user_id == context.user_id:
609 host_request.surfer_last_seen_message_id = latest_message.id
610 else:
611 host_request.host_last_seen_message_id = latest_message.id
612 session.commit()
614 return empty_pb2.Empty()
616 def GetHostRequestMessages(
617 self, request: requests_pb2.GetHostRequestMessagesReq, context: CouchersContext, session: Session
618 ) -> requests_pb2.GetHostRequestMessagesRes:
619 host_request = session.execute(
620 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
621 HostRequest.conversation_id == request.host_request_id
622 )
623 ).scalar_one_or_none()
625 if not host_request: 625 ↛ 626line 625 didn't jump to line 626 because the condition on line 625 was never true
626 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
628 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 628 ↛ 629line 628 didn't jump to line 629 because the condition on line 628 was never true
629 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
631 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
632 pagination = min(pagination, MAX_PAGE_SIZE)
634 messages = (
635 session.execute(
636 select(Message)
637 .where(Message.conversation_id == host_request.conversation_id)
638 .where(or_(Message.id < request.last_message_id, to_bool(request.last_message_id == 0)))
639 .order_by(Message.id.desc())
640 .limit(pagination + 1)
641 )
642 .scalars()
643 .all()
644 )
646 no_more = len(messages) <= pagination
648 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0
650 return requests_pb2.GetHostRequestMessagesRes(
651 last_message_id=last_message_id,
652 no_more=no_more,
653 messages=[message_to_pb(message) for message in messages[:pagination]],
654 )
656 def SendHostRequestMessage(
657 self, request: requests_pb2.SendHostRequestMessageReq, context: CouchersContext, session: Session
658 ) -> empty_pb2.Empty:
659 if request.text == "":
660 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
661 host_request = session.execute(
662 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
663 HostRequest.conversation_id == request.host_request_id
664 )
665 ).scalar_one_or_none()
667 if not host_request:
668 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
670 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
671 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
673 if host_request.host_user_id == context.user_id:
674 _possibly_observe_first_response_time(session, host_request, context.user_id, "message")
676 message = Message(
677 conversation_id=host_request.conversation_id,
678 author_id=context.user_id,
679 message_type=MessageType.text,
680 text=request.text,
681 )
683 session.add(message)
684 session.flush()
686 if host_request.surfer_user_id == context.user_id:
687 host_request.surfer_last_seen_message_id = message.id
689 notify(
690 session,
691 user_id=host_request.host_user_id,
692 topic_action=NotificationTopicAction.host_request__message,
693 key=str(host_request.conversation_id),
694 data=notification_data_pb2.HostRequestMessage(
695 host_request=host_request_to_pb(host_request, session, context),
696 user=user_model_to_pb(host_request.surfer, session, context),
697 text=request.text,
698 am_host=True,
699 ),
700 moderation_state_id=host_request.moderation_state_id,
701 )
703 else:
704 host_request.host_last_seen_message_id = message.id
706 notify(
707 session,
708 user_id=host_request.surfer_user_id,
709 topic_action=NotificationTopicAction.host_request__message,
710 key=str(host_request.conversation_id),
711 data=notification_data_pb2.HostRequestMessage(
712 host_request=host_request_to_pb(host_request, session, context),
713 user=user_model_to_pb(host_request.host, session, context),
714 text=request.text,
715 am_host=False,
716 ),
717 moderation_state_id=host_request.moderation_state_id,
718 )
720 session.commit()
722 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
723 sent_messages_counter.labels(user_gender, "host request").inc()
725 return empty_pb2.Empty()
727 def GetHostRequestUpdates(
728 self, request: requests_pb2.GetHostRequestUpdatesReq, context: CouchersContext, session: Session
729 ) -> requests_pb2.GetHostRequestUpdatesRes:
730 if request.only_sent and request.only_received: 730 ↛ 731line 730 didn't jump to line 731 because the condition on line 730 was never true
731 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
733 if request.newest_message_id == 0:
734 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
736 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none(): 736 ↛ 737line 736 didn't jump to line 737 because the condition on line 736 was never true
737 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
739 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
740 pagination = min(pagination, MAX_PAGE_SIZE)
742 statement = where_moderated_content_visible(
743 select(
744 Message,
745 HostRequest.status.label("host_request_status"),
746 HostRequest.conversation_id.label("host_request_id"),
747 )
748 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
749 .where(Message.id > request.newest_message_id),
750 context,
751 HostRequest,
752 is_list_operation=False,
753 )
755 if request.only_sent: 755 ↛ 756line 755 didn't jump to line 756 because the condition on line 755 was never true
756 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
757 elif request.only_received: 757 ↛ 758line 757 didn't jump to line 758 because the condition on line 757 was never true
758 statement = statement.where(HostRequest.host_user_id == context.user_id)
759 else:
760 statement = statement.where(
761 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
762 )
764 statement = statement.order_by(Message.id.asc()).limit(pagination + 1)
765 res = session.execute(statement).all()
767 no_more = len(res) <= pagination
769 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO
771 return requests_pb2.GetHostRequestUpdatesRes(
772 no_more=no_more,
773 updates=[
774 requests_pb2.HostRequestUpdate(
775 host_request_id=result.host_request_id,
776 status=hostrequeststatus2api[result.host_request_status],
777 message=message_to_pb(result.Message),
778 )
779 for result in res[:pagination]
780 ],
781 )
783 def MarkLastSeenHostRequest(
784 self, request: requests_pb2.MarkLastSeenHostRequestReq, context: CouchersContext, session: Session
785 ) -> empty_pb2.Empty:
786 host_request = session.execute(
787 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False).where(
788 HostRequest.conversation_id == request.host_request_id
789 )
790 ).scalar_one_or_none()
792 if not host_request: 792 ↛ 793line 792 didn't jump to line 793 because the condition on line 792 was never true
793 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
795 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id: 795 ↛ 796line 795 didn't jump to line 796 because the condition on line 795 was never true
796 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
798 if host_request.surfer_user_id == context.user_id: 798 ↛ 799line 798 didn't jump to line 799 because the condition on line 798 was never true
799 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id:
800 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
801 host_request.surfer_last_seen_message_id = request.last_seen_message_id
802 else:
803 if not host_request.host_last_seen_message_id <= request.last_seen_message_id:
804 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
805 host_request.host_last_seen_message_id = request.last_seen_message_id
807 session.commit()
808 return empty_pb2.Empty()
810 def SetHostRequestArchiveStatus(
811 self, request: requests_pb2.SetHostRequestArchiveStatusReq, context: CouchersContext, session: Session
812 ) -> requests_pb2.SetHostRequestArchiveStatusRes:
813 host_request = session.execute(
814 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False)
815 .where(HostRequest.conversation_id == request.host_request_id)
816 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id))
817 ).scalar_one_or_none()
819 if not host_request: 819 ↛ 820line 819 didn't jump to line 820 because the condition on line 819 was never true
820 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
822 if context.user_id == host_request.surfer_user_id: 822 ↛ 825line 822 didn't jump to line 825 because the condition on line 822 was always true
823 host_request.is_surfer_archived = request.is_archived
824 else:
825 host_request.is_host_archived = request.is_archived
827 return requests_pb2.SetHostRequestArchiveStatusRes(
828 host_request_id=host_request.conversation_id,
829 is_archived=request.is_archived,
830 )
832 def GetResponseRate(
833 self, request: requests_pb2.GetResponseRateReq, context: CouchersContext, session: Session
834 ) -> requests_pb2.GetResponseRateRes:
835 user_res = session.execute(
836 select(User.id, UserResponseRate)
837 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id)
838 .where(users_visible(context, User))
839 .where(User.id == request.user_id)
840 ).one_or_none()
842 # if user doesn't exist, return None
843 if not user_res:
844 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
846 user, response_rates = user_res
847 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates)) # type: ignore[arg-type]
849 def SendHostRequestFeedback(
850 self, request: requests_pb2.SendHostRequestFeedbackReq, context: CouchersContext, session: Session
851 ) -> empty_pb2.Empty:
852 host_request = session.execute(
853 where_moderated_content_visible(select(HostRequest), context, HostRequest, is_list_operation=False)
854 .where(HostRequest.conversation_id == request.host_request_id)
855 .where(HostRequest.host_user_id == context.user_id)
856 ).scalar_one_or_none()
858 if not host_request:
859 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
861 feedback = session.execute(
862 select(HostRequestFeedback)
863 .where(HostRequestFeedback.host_request_id == host_request.conversation_id)
864 .where(HostRequestFeedback.from_user_id == context.user_id)
865 ).scalar_one_or_none()
867 if feedback:
868 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback")
870 session.add(
871 HostRequestFeedback(
872 host_request_id=host_request.conversation_id,
873 from_user_id=host_request.host_user_id,
874 to_user_id=host_request.surfer_user_id,
875 request_quality=hostrequestquality2sql.get(request.host_request_quality),
876 decline_reason=request.decline_reason,
877 )
878 )
880 return empty_pb2.Empty()