Coverage for src/couchers/servicers/requests.py: 93%
292 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-22 00:46 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-22 00:46 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy import exists
7from sqlalchemy.orm import aliased
8from sqlalchemy.sql import and_, func, or_
10from couchers.constants import HOST_REQUEST_MIN_LENGTH_UTF16
11from couchers.materialized_views import UserResponseRate
12from couchers.metrics import (
13 account_age_on_host_request_create_histogram,
14 host_request_first_response_histogram,
15 host_request_responses_counter,
16 host_requests_sent_counter,
17 sent_messages_counter,
18)
19from couchers.models import (
20 Conversation,
21 HostRequest,
22 HostRequestFeedback,
23 HostRequestQuality,
24 HostRequestStatus,
25 Message,
26 MessageType,
27 ModerationObjectType,
28 RateLimitAction,
29 User,
30)
31from couchers.moderation.utils import create_moderation
32from couchers.notifications.notify import notify
33from couchers.proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc
34from couchers.rate_limits.check import process_rate_limits_and_check_abort
35from couchers.rate_limits.definitions import RATE_LIMIT_HOURS
36from couchers.servicers.api import response_rate_to_pb, user_model_to_pb
37from couchers.sql import couchers_select as select
38from couchers.utils import (
39 Timestamp_from_datetime,
40 date_to_api,
41 get_coordinates,
42 now,
43 parse_date,
44 today_in_timezone,
45)
47logger = logging.getLogger(__name__)
49DEFAULT_PAGINATION_LENGTH = 10
50MAX_PAGE_SIZE = 50
53hostrequeststatus2api = {
54 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING,
55 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED,
56 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED,
57 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED,
58 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED,
59}
61hostrequestquality2sql = {
62 requests_pb2.HOST_REQUEST_QUALITY_UNSPECIFIED: HostRequestQuality.high_quality,
63 requests_pb2.HOST_REQUEST_QUALITY_LOW: HostRequestQuality.okay_quality,
64 requests_pb2.HOST_REQUEST_QUALITY_OKAY: HostRequestQuality.low_quality,
65}
68def message_to_pb(message: Message):
69 """
70 Turns the given message to a protocol buffer
71 """
72 if message.is_normal_message:
73 return conversations_pb2.Message(
74 message_id=message.id,
75 author_user_id=message.author_id,
76 time=Timestamp_from_datetime(message.time),
77 text=conversations_pb2.MessageContentText(text=message.text),
78 )
79 else:
80 return conversations_pb2.Message(
81 message_id=message.id,
82 author_user_id=message.author_id,
83 time=Timestamp_from_datetime(message.time),
84 chat_created=(
85 conversations_pb2.MessageContentChatCreated()
86 if message.message_type == MessageType.chat_created
87 else None
88 ),
89 host_request_status_changed=(
90 conversations_pb2.MessageContentHostRequestStatusChanged(
91 status=hostrequeststatus2api[message.host_request_status_target]
92 )
93 if message.message_type == MessageType.host_request_status_changed
94 else None
95 ),
96 )
99def host_request_to_pb(host_request: HostRequest, session, context):
100 initial_message = session.execute(
101 select(Message)
102 .where(Message.conversation_id == host_request.conversation_id)
103 .order_by(Message.id.asc())
104 .limit(1)
105 ).scalar_one()
107 latest_message = session.execute(
108 select(Message)
109 .where(Message.conversation_id == host_request.conversation_id)
110 .order_by(Message.id.desc())
111 .limit(1)
112 ).scalar_one()
114 lat, lng = get_coordinates(host_request.hosting_location)
116 need_feedback = False
117 if context.user_id == host_request.host_user_id and host_request.status == HostRequestStatus.rejected:
118 need_feedback = not session.execute(
119 select(
120 exists().where(
121 HostRequestFeedback.from_user_id == context.user_id,
122 HostRequestFeedback.host_request_id == host_request.conversation_id,
123 )
124 )
125 ).scalar_one()
127 return requests_pb2.HostRequest(
128 host_request_id=host_request.conversation_id,
129 surfer_user_id=host_request.surfer_user_id,
130 host_user_id=host_request.host_user_id,
131 status=hostrequeststatus2api[host_request.status],
132 created=Timestamp_from_datetime(initial_message.time),
133 from_date=date_to_api(host_request.from_date),
134 to_date=date_to_api(host_request.to_date),
135 last_seen_message_id=(
136 host_request.surfer_last_seen_message_id
137 if context.user_id == host_request.surfer_user_id
138 else host_request.host_last_seen_message_id
139 ),
140 latest_message=message_to_pb(latest_message),
141 hosting_city=host_request.hosting_city,
142 hosting_lat=lat,
143 hosting_lng=lng,
144 hosting_radius=host_request.hosting_radius,
145 need_host_request_feedback=need_feedback,
146 )
149def _possibly_observe_first_response_time(session, host_request, user_id, response_type):
150 # if this is the first response then there's nothing by this user yet
151 assert host_request.host_user_id == user_id
153 number_messages_by_host = session.execute(
154 select(func.count())
155 .where(Message.conversation_id == host_request.conversation_id)
156 .where(Message.author_id == user_id)
157 ).scalar_one_or_none()
159 if number_messages_by_host == 0:
160 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one()
161 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one()
162 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe(
163 (now() - host_request.conversation.created).total_seconds()
164 )
167def _is_host_request_long_enough(text: str) -> bool:
168 # Python's len(str) does not match Javascript's string.length.
169 # e.g. len("é") == 2 but "é".length == 1.
170 # To match the frontend's validation, measure the string in utf16 code units.
171 text_length_utf16 = len(text.encode("utf-16-le")) // 2 # utf-16-le does not include a prefix BOM code unit.
172 return text_length_utf16 >= HOST_REQUEST_MIN_LENGTH_UTF16
175class Requests(requests_pb2_grpc.RequestsServicer):
176 def CreateHostRequest(self, request, context, session):
177 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
178 if not user.has_completed_profile:
179 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_send_request")
181 if request.host_user_id == context.user_id:
182 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cant_request_self")
184 # just to check host exists and is visible
185 host = session.execute(
186 select(User).where_users_visible(context).where(User.id == request.host_user_id)
187 ).scalar_one_or_none()
188 if not host:
189 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
191 from_date = parse_date(request.from_date)
192 to_date = parse_date(request.to_date)
194 if not from_date or not to_date:
195 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_date")
197 today = today_in_timezone(host.timezone)
199 # request starts from the past
200 if from_date < today:
201 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_before_today")
203 # from_date is not >= to_date
204 if from_date >= to_date:
205 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_to")
207 # No need to check today > to_date
209 if from_date - today > timedelta(days=365):
210 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_from_after_one_year")
212 if to_date - from_date > timedelta(days=365):
213 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "date_to_after_one_year")
215 # Check minimum length
216 if not _is_host_request_long_enough(request.text):
217 context.abort_with_error_code(
218 grpc.StatusCode.INVALID_ARGUMENT,
219 "host_request_too_short",
220 substitutions={"chars": HOST_REQUEST_MIN_LENGTH_UTF16},
221 )
223 # Check if user has been sending host requests excessively
224 if process_rate_limits_and_check_abort(
225 session=session, user_id=context.user_id, action=RateLimitAction.host_request
226 ):
227 context.abort_with_error_code(
228 grpc.StatusCode.RESOURCE_EXHAUSTED,
229 "host_request_rate_limit",
230 substitutions={"hours": RATE_LIMIT_HOURS},
231 )
233 conversation = Conversation()
234 session.add(conversation)
235 session.flush()
237 session.add(
238 Message(
239 conversation_id=conversation.id,
240 author_id=context.user_id,
241 message_type=MessageType.chat_created,
242 )
243 )
245 message = Message(
246 conversation_id=conversation.id,
247 author_id=context.user_id,
248 text=request.text,
249 message_type=MessageType.text,
250 )
251 session.add(message)
252 session.flush()
254 # Create moderation state for UMS (starts as SHADOWED)
255 moderation_state = create_moderation(
256 session=session,
257 object_type=ModerationObjectType.HOST_REQUEST,
258 object_id=conversation.id,
259 creator_user_id=context.user_id,
260 )
262 host_request = HostRequest(
263 conversation_id=conversation.id,
264 surfer_user_id=context.user_id,
265 host_user_id=host.id,
266 moderation_state_id=moderation_state.id,
267 from_date=from_date,
268 to_date=to_date,
269 status=HostRequestStatus.pending,
270 surfer_last_seen_message_id=message.id,
271 # TODO: tz
272 # timezone=host.timezone,
273 hosting_city=host.city,
274 hosting_location=host.geom,
275 hosting_radius=host.geom_radius,
276 )
277 session.add(host_request)
278 session.flush()
280 notify(
281 session,
282 user_id=host_request.host_user_id,
283 topic_action="host_request:create",
284 key=str(host_request.conversation_id),
285 data=notification_data_pb2.HostRequestCreate(
286 host_request=host_request_to_pb(host_request, session, context),
287 surfer=user_model_to_pb(host_request.surfer, session, context),
288 text=request.text,
289 ),
290 moderation_state_id=moderation_state.id,
291 )
293 host_requests_sent_counter.labels(user.gender, host.gender).inc()
294 sent_messages_counter.labels(user.gender, "host request send").inc()
295 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe(
296 (now() - user.joined).total_seconds()
297 )
299 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id)
301 def GetHostRequest(self, request, context, session):
302 host_request = session.execute(
303 select(HostRequest)
304 .where_users_column_visible(context, HostRequest.surfer_user_id)
305 .where_users_column_visible(context, HostRequest.host_user_id)
306 .where_moderated_content_visible(context, HostRequest, is_list_operation=False)
307 .where(HostRequest.conversation_id == request.host_request_id)
308 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id))
309 ).scalar_one_or_none()
311 if not host_request:
312 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
314 return host_request_to_pb(host_request, session, context)
316 def ListHostRequests(self, request, context, session):
317 if request.only_sent and request.only_received:
318 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
320 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
321 pagination = min(pagination, MAX_PAGE_SIZE)
323 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have
324 # none as message_2.id. So just filter for these ones to get highest messages only.
325 # See https://stackoverflow.com/a/27802817/6115336
326 message_2 = aliased(Message)
327 statement = (
328 select(Message, HostRequest, Conversation)
329 .outerjoin(message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id))
330 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
331 .join(Conversation, Conversation.id == HostRequest.conversation_id)
332 .where_users_column_visible(context, HostRequest.surfer_user_id)
333 .where_users_column_visible(context, HostRequest.host_user_id)
334 .where_moderated_content_visible(context, HostRequest, is_list_operation=True)
335 .where(message_2.id == None)
336 .where(or_(Message.id < request.last_request_id, request.last_request_id == 0))
337 )
339 if request.only_sent:
340 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
341 elif request.only_received:
342 statement = statement.where(HostRequest.host_user_id == context.user_id)
343 elif request.HasField("only_archived"):
344 statement = statement.where(
345 or_(
346 and_(
347 HostRequest.surfer_user_id == context.user_id,
348 HostRequest.is_surfer_archived == request.only_archived,
349 ),
350 and_(
351 HostRequest.host_user_id == context.user_id,
352 HostRequest.is_host_archived == request.only_archived,
353 ),
354 )
355 )
356 else:
357 statement = statement.where(
358 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
359 )
361 # TODO: I considered having the latest control message be the single source of truth for
362 # the HostRequest.status, but decided against it because of this filter.
363 # Another possibility is to filter in the python instead of SQL, but that's slower
364 if request.only_active:
365 statement = statement.where(
366 or_(
367 HostRequest.status == HostRequestStatus.pending,
368 HostRequest.status == HostRequestStatus.accepted,
369 HostRequest.status == HostRequestStatus.confirmed,
370 )
371 )
372 statement = statement.where(HostRequest.end_time <= func.now())
374 statement = statement.order_by(Message.id.desc()).limit(pagination + 1)
375 results = session.execute(statement).all()
377 host_requests = []
378 for result in results[:pagination]:
379 lat, lng = get_coordinates(result.HostRequest.hosting_location)
380 host_requests.append(
381 requests_pb2.HostRequest(
382 host_request_id=result.HostRequest.conversation_id,
383 surfer_user_id=result.HostRequest.surfer_user_id,
384 host_user_id=result.HostRequest.host_user_id,
385 status=hostrequeststatus2api[result.HostRequest.status],
386 created=Timestamp_from_datetime(result.Conversation.created),
387 from_date=date_to_api(result.HostRequest.from_date),
388 to_date=date_to_api(result.HostRequest.to_date),
389 last_seen_message_id=(
390 result.HostRequest.surfer_last_seen_message_id
391 if context.user_id == result.HostRequest.surfer_user_id
392 else result.HostRequest.host_last_seen_message_id
393 ),
394 latest_message=message_to_pb(result.Message),
395 hosting_city=result.HostRequest.hosting_city,
396 hosting_lat=lat,
397 hosting_lng=lng,
398 hosting_radius=result.HostRequest.hosting_radius,
399 )
400 )
402 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO
403 no_more = len(results) <= pagination
405 return requests_pb2.ListHostRequestsRes(
406 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests
407 )
409 def RespondHostRequest(self, request, context, session):
410 def count_host_response(other_user_id, response_type):
411 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
412 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one()
413 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc()
414 sent_messages_counter.labels(user_gender, "host request response").inc()
416 host_request = session.execute(
417 select(HostRequest)
418 .where_users_column_visible(context, HostRequest.surfer_user_id)
419 .where_users_column_visible(context, HostRequest.host_user_id)
420 .where_moderated_content_visible(context, HostRequest, is_list_operation=False)
421 .where(HostRequest.conversation_id == request.host_request_id)
422 ).scalar_one_or_none()
424 if not host_request:
425 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
427 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
428 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
430 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING:
431 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
433 if host_request.end_time < now():
434 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_in_past")
436 control_message = Message()
438 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED:
439 # only host can accept
440 if context.user_id != host_request.host_user_id:
441 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "not_the_host")
442 # can't accept a cancelled or confirmed request (only reject), or already accepted
443 if (
444 host_request.status == HostRequestStatus.cancelled
445 or host_request.status == HostRequestStatus.confirmed
446 or host_request.status == HostRequestStatus.accepted
447 ):
448 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
449 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted")
450 control_message.host_request_status_target = HostRequestStatus.accepted
451 host_request.status = HostRequestStatus.accepted
452 session.flush()
454 notify(
455 session,
456 user_id=host_request.surfer_user_id,
457 topic_action="host_request:accept",
458 key=str(host_request.conversation_id),
459 data=notification_data_pb2.HostRequestAccept(
460 host_request=host_request_to_pb(host_request, session, context),
461 host=user_model_to_pb(host_request.host, session, context),
462 ),
463 moderation_state_id=host_request.moderation_state_id,
464 )
466 count_host_response(host_request.surfer_user_id, "accepted")
468 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED:
469 # only host can reject
470 if context.user_id != host_request.host_user_id:
471 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
472 # can't reject a cancelled or already rejected request
473 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected:
474 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
475 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected")
476 control_message.host_request_status_target = HostRequestStatus.rejected
477 host_request.status = HostRequestStatus.rejected
478 session.flush()
480 notify(
481 session,
482 user_id=host_request.surfer_user_id,
483 topic_action="host_request:reject",
484 key=str(host_request.conversation_id),
485 data=notification_data_pb2.HostRequestReject(
486 host_request=host_request_to_pb(host_request, session, context),
487 host=user_model_to_pb(host_request.host, session, context),
488 ),
489 moderation_state_id=host_request.moderation_state_id,
490 )
492 count_host_response(host_request.surfer_user_id, "rejected")
494 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED:
495 # only surfer can confirm
496 if context.user_id != host_request.surfer_user_id:
497 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
498 # can only confirm an accepted request
499 if host_request.status != HostRequestStatus.accepted:
500 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
501 control_message.host_request_status_target = HostRequestStatus.confirmed
502 host_request.status = HostRequestStatus.confirmed
503 session.flush()
505 notify(
506 session,
507 user_id=host_request.host_user_id,
508 topic_action="host_request:confirm",
509 key=str(host_request.conversation_id),
510 data=notification_data_pb2.HostRequestConfirm(
511 host_request=host_request_to_pb(host_request, session, context),
512 surfer=user_model_to_pb(host_request.surfer, session, context),
513 ),
514 moderation_state_id=host_request.moderation_state_id,
515 )
517 count_host_response(host_request.host_user_id, "confirmed")
519 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED:
520 # only surfer can cancel
521 if context.user_id != host_request.surfer_user_id:
522 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
523 # can't' cancel an already cancelled or rejected request
524 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled:
525 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "invalid_host_request_status")
526 control_message.host_request_status_target = HostRequestStatus.cancelled
527 host_request.status = HostRequestStatus.cancelled
528 session.flush()
530 notify(
531 session,
532 user_id=host_request.host_user_id,
533 topic_action="host_request:cancel",
534 key=str(host_request.conversation_id),
535 data=notification_data_pb2.HostRequestCancel(
536 host_request=host_request_to_pb(host_request, session, context),
537 surfer=user_model_to_pb(host_request.surfer, session, context),
538 ),
539 moderation_state_id=host_request.moderation_state_id,
540 )
542 count_host_response(host_request.host_user_id, "cancelled")
544 control_message.message_type = MessageType.host_request_status_changed
545 control_message.conversation_id = host_request.conversation_id
546 control_message.author_id = context.user_id
547 session.add(control_message)
549 if request.text:
550 latest_message = Message()
551 latest_message.conversation_id = host_request.conversation_id
552 latest_message.text = request.text
553 latest_message.author_id = context.user_id
554 latest_message.message_type = MessageType.text
555 session.add(latest_message)
556 else:
557 latest_message = control_message
559 session.flush()
561 if host_request.surfer_user_id == context.user_id:
562 host_request.surfer_last_seen_message_id = latest_message.id
563 else:
564 host_request.host_last_seen_message_id = latest_message.id
565 session.commit()
567 return empty_pb2.Empty()
569 def GetHostRequestMessages(self, request, context, session):
570 host_request = session.execute(
571 select(HostRequest)
572 .where_moderated_content_visible(context, HostRequest, is_list_operation=False)
573 .where(HostRequest.conversation_id == request.host_request_id)
574 ).scalar_one_or_none()
576 if not host_request:
577 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
579 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
580 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
582 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
583 pagination = min(pagination, MAX_PAGE_SIZE)
585 messages = (
586 session.execute(
587 select(Message)
588 .where(Message.conversation_id == host_request.conversation_id)
589 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0))
590 .order_by(Message.id.desc())
591 .limit(pagination + 1)
592 )
593 .scalars()
594 .all()
595 )
597 no_more = len(messages) <= pagination
599 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0
601 return requests_pb2.GetHostRequestMessagesRes(
602 last_message_id=last_message_id,
603 no_more=no_more,
604 messages=[message_to_pb(message) for message in messages[:pagination]],
605 )
607 def SendHostRequestMessage(self, request, context, session):
608 if request.text == "":
609 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
610 host_request = session.execute(
611 select(HostRequest)
612 .where_moderated_content_visible(context, HostRequest, is_list_operation=False)
613 .where(HostRequest.conversation_id == request.host_request_id)
614 ).scalar_one_or_none()
616 if not host_request:
617 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
619 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
620 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
622 if host_request.host_user_id == context.user_id:
623 _possibly_observe_first_response_time(session, host_request, context.user_id, "message")
625 message = Message()
626 message.conversation_id = host_request.conversation_id
627 message.author_id = context.user_id
628 message.message_type = MessageType.text
629 message.text = request.text
630 session.add(message)
631 session.flush()
633 if host_request.surfer_user_id == context.user_id:
634 host_request.surfer_last_seen_message_id = message.id
636 notify(
637 session,
638 user_id=host_request.host_user_id,
639 topic_action="host_request:message",
640 key=str(host_request.conversation_id),
641 data=notification_data_pb2.HostRequestMessage(
642 host_request=host_request_to_pb(host_request, session, context),
643 user=user_model_to_pb(host_request.surfer, session, context),
644 text=request.text,
645 am_host=True,
646 ),
647 moderation_state_id=host_request.moderation_state_id,
648 )
650 else:
651 host_request.host_last_seen_message_id = message.id
653 notify(
654 session,
655 user_id=host_request.surfer_user_id,
656 topic_action="host_request:message",
657 key=str(host_request.conversation_id),
658 data=notification_data_pb2.HostRequestMessage(
659 host_request=host_request_to_pb(host_request, session, context),
660 user=user_model_to_pb(host_request.host, session, context),
661 text=request.text,
662 am_host=False,
663 ),
664 moderation_state_id=host_request.moderation_state_id,
665 )
667 session.commit()
669 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
670 sent_messages_counter.labels(user_gender, "host request").inc()
672 return empty_pb2.Empty()
674 def GetHostRequestUpdates(self, request, context, session):
675 if request.only_sent and request.only_received:
676 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "host_request_sent_or_received")
678 if request.newest_message_id == 0:
679 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
681 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none():
682 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_message")
684 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
685 pagination = min(pagination, MAX_PAGE_SIZE)
687 statement = (
688 select(
689 Message,
690 HostRequest.status.label("host_request_status"),
691 HostRequest.conversation_id.label("host_request_id"),
692 )
693 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
694 .where_moderated_content_visible(context, HostRequest, is_list_operation=False)
695 .where(Message.id > request.newest_message_id)
696 )
698 if request.only_sent:
699 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
700 elif request.only_received:
701 statement = statement.where(HostRequest.host_user_id == context.user_id)
702 else:
703 statement = statement.where(
704 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
705 )
707 statement = statement.order_by(Message.id.asc()).limit(pagination + 1)
708 res = session.execute(statement).all()
710 no_more = len(res) <= pagination
712 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO
714 return requests_pb2.GetHostRequestUpdatesRes(
715 no_more=no_more,
716 updates=[
717 requests_pb2.HostRequestUpdate(
718 host_request_id=result.host_request_id,
719 status=hostrequeststatus2api[result.host_request_status],
720 message=message_to_pb(result.Message),
721 )
722 for result in res[:pagination]
723 ],
724 )
726 def MarkLastSeenHostRequest(self, request, context, session):
727 host_request = session.execute(
728 select(HostRequest)
729 .where_moderated_content_visible(context, HostRequest, is_list_operation=False)
730 .where(HostRequest.conversation_id == request.host_request_id)
731 ).scalar_one_or_none()
733 if not host_request:
734 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
736 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
737 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
739 if host_request.surfer_user_id == context.user_id:
740 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id:
741 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
742 host_request.surfer_last_seen_message_id = request.last_seen_message_id
743 else:
744 if not host_request.host_last_seen_message_id <= request.last_seen_message_id:
745 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cant_unsee_messages")
746 host_request.host_last_seen_message_id = request.last_seen_message_id
748 session.commit()
749 return empty_pb2.Empty()
751 def SetHostRequestArchiveStatus(self, request, context, session):
752 host_request: HostRequest = session.execute(
753 select(HostRequest)
754 .where_moderated_content_visible(context, HostRequest, is_list_operation=False)
755 .where(HostRequest.conversation_id == request.host_request_id)
756 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id))
757 ).scalar_one_or_none()
759 if not host_request:
760 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
762 if context.user_id == host_request.surfer_user_id:
763 host_request.is_surfer_archived = request.is_archived
764 else:
765 host_request.is_host_archived = request.is_archived
767 return requests_pb2.SetHostRequestArchiveStatusRes(
768 host_request_id=host_request.conversation_id,
769 is_archived=request.is_archived,
770 )
772 def GetResponseRate(self, request, context, session):
773 user_res = session.execute(
774 select(User.id, UserResponseRate)
775 .outerjoin(UserResponseRate, UserResponseRate.user_id == User.id)
776 .where_users_visible(context)
777 .where(User.id == request.user_id)
778 ).one_or_none()
780 # if user doesn't exist, return None
781 if not user_res:
782 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
784 user, response_rates = user_res
785 return requests_pb2.GetResponseRateRes(**response_rate_to_pb(response_rates))
787 def SendHostRequestFeedback(self, request, context, session):
788 host_request = session.execute(
789 select(HostRequest)
790 .where_moderated_content_visible(context, HostRequest, is_list_operation=False)
791 .where(HostRequest.conversation_id == request.host_request_id)
792 .where(HostRequest.host_user_id == context.user_id)
793 ).scalar_one_or_none()
795 if not host_request:
796 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "host_request_not_found")
798 feedback = session.execute(
799 select(HostRequestFeedback)
800 .where(HostRequestFeedback.host_request_id == host_request.conversation_id)
801 .where(HostRequestFeedback.from_user_id == context.user_id)
802 ).scalar_one_or_none()
804 if feedback:
805 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_left_host_request_feedback")
807 session.add(
808 HostRequestFeedback(
809 host_request_id=host_request.conversation_id,
810 from_user_id=host_request.host_user_id,
811 to_user_id=host_request.surfer_user_id,
812 request_quality=hostrequestquality2sql.get(request.host_request_quality),
813 decline_reason=request.decline_reason,
814 )
815 )
817 return empty_pb2.Empty()