Coverage for src/couchers/servicers/requests.py: 93%
270 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-21 04:21 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-21 04:21 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy import Float
7from sqlalchemy.orm import aliased
8from sqlalchemy.sql import and_, func, or_
9from sqlalchemy.sql.functions import percentile_disc
11from couchers import errors
12from couchers.metrics import (
13 account_age_on_host_request_create_histogram,
14 host_request_first_response_histogram,
15 host_request_responses_counter,
16 host_requests_sent_counter,
17 sent_messages_counter,
18)
19from couchers.models import Conversation, HostRequest, HostRequestStatus, Message, MessageType, User
20from couchers.notifications.notify import notify
21from couchers.servicers.api import user_model_to_pb
22from couchers.sql import couchers_select as select
23from couchers.utils import (
24 Duration_from_timedelta,
25 Timestamp_from_datetime,
26 date_to_api,
27 now,
28 parse_date,
29 today_in_timezone,
30)
31from proto import conversations_pb2, notification_data_pb2, requests_pb2, requests_pb2_grpc
33logger = logging.getLogger(__name__)
35DEFAULT_PAGINATION_LENGTH = 10
36MAX_PAGE_SIZE = 50
39hostrequeststatus2api = {
40 HostRequestStatus.pending: conversations_pb2.HOST_REQUEST_STATUS_PENDING,
41 HostRequestStatus.accepted: conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED,
42 HostRequestStatus.rejected: conversations_pb2.HOST_REQUEST_STATUS_REJECTED,
43 HostRequestStatus.confirmed: conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED,
44 HostRequestStatus.cancelled: conversations_pb2.HOST_REQUEST_STATUS_CANCELLED,
45}
48def message_to_pb(message: Message):
49 """
50 Turns the given message to a protocol buffer
51 """
52 if message.is_normal_message:
53 return conversations_pb2.Message(
54 message_id=message.id,
55 author_user_id=message.author_id,
56 time=Timestamp_from_datetime(message.time),
57 text=conversations_pb2.MessageContentText(text=message.text),
58 )
59 else:
60 return conversations_pb2.Message(
61 message_id=message.id,
62 author_user_id=message.author_id,
63 time=Timestamp_from_datetime(message.time),
64 chat_created=(
65 conversations_pb2.MessageContentChatCreated()
66 if message.message_type == MessageType.chat_created
67 else None
68 ),
69 host_request_status_changed=(
70 conversations_pb2.MessageContentHostRequestStatusChanged(
71 status=hostrequeststatus2api[message.host_request_status_target]
72 )
73 if message.message_type == MessageType.host_request_status_changed
74 else None
75 ),
76 )
79def host_request_to_pb(host_request: HostRequest, session, context):
80 initial_message = session.execute(
81 select(Message)
82 .where(Message.conversation_id == host_request.conversation_id)
83 .order_by(Message.id.asc())
84 .limit(1)
85 ).scalar_one()
87 latest_message = session.execute(
88 select(Message)
89 .where(Message.conversation_id == host_request.conversation_id)
90 .order_by(Message.id.desc())
91 .limit(1)
92 ).scalar_one()
94 return requests_pb2.HostRequest(
95 host_request_id=host_request.conversation_id,
96 surfer_user_id=host_request.surfer_user_id,
97 host_user_id=host_request.host_user_id,
98 status=hostrequeststatus2api[host_request.status],
99 created=Timestamp_from_datetime(initial_message.time),
100 from_date=date_to_api(host_request.from_date),
101 to_date=date_to_api(host_request.to_date),
102 last_seen_message_id=(
103 host_request.surfer_last_seen_message_id
104 if context.user_id == host_request.surfer_user_id
105 else host_request.host_last_seen_message_id
106 ),
107 latest_message=message_to_pb(latest_message),
108 )
111def _possibly_observe_first_response_time(session, host_request, user_id, response_type):
112 # if this is the first response then there's nothing by this user yet
113 assert host_request.host_user_id == user_id
115 number_messages_by_host = session.execute(
116 select(func.count())
117 .where(Message.conversation_id == host_request.conversation_id)
118 .where(Message.author_id == user_id)
119 ).scalar_one_or_none()
121 if number_messages_by_host == 0:
122 host_gender = session.execute(select(User.gender).where(User.id == host_request.host_user_id)).scalar_one()
123 surfer_gender = session.execute(select(User.gender).where(User.id == host_request.surfer_user_id)).scalar_one()
124 host_request_first_response_histogram.labels(host_gender, surfer_gender, response_type).observe(
125 (now() - host_request.conversation.created).total_seconds()
126 )
129class Requests(requests_pb2_grpc.RequestsServicer):
130 def CreateHostRequest(self, request, context, session):
131 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
132 if not user.has_completed_profile:
133 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_SEND_REQUEST)
135 if request.host_user_id == context.user_id:
136 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.CANT_REQUEST_SELF)
138 # just to check host exists and is visible
139 host = session.execute(
140 select(User).where_users_visible(context).where(User.id == request.host_user_id)
141 ).scalar_one_or_none()
142 if not host:
143 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
145 from_date = parse_date(request.from_date)
146 to_date = parse_date(request.to_date)
148 if not from_date or not to_date:
149 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_DATE)
151 today = today_in_timezone(host.timezone)
153 # request starts from the past
154 if from_date < today:
155 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_BEFORE_TODAY)
157 # from_date is not >= to_date
158 if from_date >= to_date:
159 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_AFTER_TO)
161 # No need to check today > to_date
163 if from_date - today > timedelta(days=365):
164 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_FROM_AFTER_ONE_YEAR)
166 if to_date - from_date > timedelta(days=365):
167 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.DATE_TO_AFTER_ONE_YEAR)
169 conversation = Conversation()
170 session.add(conversation)
171 session.flush()
173 session.add(
174 Message(
175 conversation_id=conversation.id,
176 author_id=context.user_id,
177 message_type=MessageType.chat_created,
178 )
179 )
181 message = Message(
182 conversation_id=conversation.id,
183 author_id=context.user_id,
184 text=request.text,
185 message_type=MessageType.text,
186 )
187 session.add(message)
188 session.flush()
190 host_request = HostRequest(
191 conversation_id=conversation.id,
192 surfer_user_id=context.user_id,
193 host_user_id=host.id,
194 from_date=from_date,
195 to_date=to_date,
196 status=HostRequestStatus.pending,
197 surfer_last_seen_message_id=message.id,
198 # TODO: tz
199 # timezone=host.timezone,
200 )
201 session.add(host_request)
202 session.commit()
204 notify(
205 session,
206 user_id=host_request.host_user_id,
207 topic_action="host_request:create",
208 key=host_request.conversation_id,
209 data=notification_data_pb2.HostRequestCreate(
210 host_request=host_request_to_pb(host_request, session, context),
211 surfer=user_model_to_pb(host_request.surfer, session, context),
212 text=request.text,
213 ),
214 )
216 host_requests_sent_counter.labels(user.gender, host.gender).inc()
217 sent_messages_counter.labels(user.gender, "host request send").inc()
218 account_age_on_host_request_create_histogram.labels(user.gender, host.gender).observe(
219 (now() - user.joined).total_seconds()
220 )
222 return requests_pb2.CreateHostRequestRes(host_request_id=host_request.conversation_id)
224 def GetHostRequest(self, request, context, session):
225 host_request = session.execute(
226 select(HostRequest)
227 .where_users_column_visible(context, HostRequest.surfer_user_id)
228 .where_users_column_visible(context, HostRequest.host_user_id)
229 .where(HostRequest.conversation_id == request.host_request_id)
230 .where(or_(HostRequest.surfer_user_id == context.user_id, HostRequest.host_user_id == context.user_id))
231 ).scalar_one_or_none()
233 if not host_request:
234 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
236 return host_request_to_pb(host_request, session, context)
238 def ListHostRequests(self, request, context, session):
239 if request.only_sent and request.only_received:
240 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_SENT_OR_RECEIVED)
242 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
243 pagination = min(pagination, MAX_PAGE_SIZE)
245 # By outer joining messages on itself where the second id is bigger, only the highest IDs will have
246 # none as message_2.id. So just filter for these ones to get highest messages only.
247 # See https://stackoverflow.com/a/27802817/6115336
248 message_2 = aliased(Message)
249 statement = (
250 select(Message, HostRequest, Conversation)
251 .outerjoin(message_2, and_(Message.conversation_id == message_2.conversation_id, Message.id < message_2.id))
252 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
253 .join(Conversation, Conversation.id == HostRequest.conversation_id)
254 .where_users_column_visible(context, HostRequest.surfer_user_id)
255 .where_users_column_visible(context, HostRequest.host_user_id)
256 .where(message_2.id == None)
257 .where(or_(Message.id < request.last_request_id, request.last_request_id == 0))
258 )
260 if request.only_sent:
261 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
262 elif request.only_received:
263 statement = statement.where(HostRequest.host_user_id == context.user_id)
264 else:
265 statement = statement.where(
266 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
267 )
269 # TODO: I considered having the latest control message be the single source of truth for
270 # the HostRequest.status, but decided against it because of this filter.
271 # Another possibility is to filter in the python instead of SQL, but that's slower
272 if request.only_active:
273 statement = statement.where(
274 or_(
275 HostRequest.status == HostRequestStatus.pending,
276 HostRequest.status == HostRequestStatus.accepted,
277 HostRequest.status == HostRequestStatus.confirmed,
278 )
279 )
280 statement = statement.where(HostRequest.end_time <= func.now())
282 statement = statement.order_by(Message.id.desc()).limit(pagination + 1)
283 results = session.execute(statement).all()
285 host_requests = [
286 requests_pb2.HostRequest(
287 host_request_id=result.HostRequest.conversation_id,
288 surfer_user_id=result.HostRequest.surfer_user_id,
289 host_user_id=result.HostRequest.host_user_id,
290 status=hostrequeststatus2api[result.HostRequest.status],
291 created=Timestamp_from_datetime(result.Conversation.created),
292 from_date=date_to_api(result.HostRequest.from_date),
293 to_date=date_to_api(result.HostRequest.to_date),
294 last_seen_message_id=(
295 result.HostRequest.surfer_last_seen_message_id
296 if context.user_id == result.HostRequest.surfer_user_id
297 else result.HostRequest.host_last_seen_message_id
298 ),
299 latest_message=message_to_pb(result.Message),
300 )
301 for result in results[:pagination]
302 ]
303 last_request_id = min(g.Message.id for g in results[:pagination]) if len(results) > pagination else 0 # TODO
304 no_more = len(results) <= pagination
306 return requests_pb2.ListHostRequestsRes(
307 last_request_id=last_request_id, no_more=no_more, host_requests=host_requests
308 )
310 def RespondHostRequest(self, request, context, session):
311 def count_host_response(other_user_id, response_type):
312 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
313 other_gender = session.execute(select(User.gender).where(User.id == other_user_id)).scalar_one()
314 host_request_responses_counter.labels(user_gender, other_gender, response_type).inc()
315 sent_messages_counter.labels(user_gender, "host request response").inc()
317 host_request = session.execute(
318 select(HostRequest)
319 .where_users_column_visible(context, HostRequest.surfer_user_id)
320 .where_users_column_visible(context, HostRequest.host_user_id)
321 .where(HostRequest.conversation_id == request.host_request_id)
322 ).scalar_one_or_none()
324 if not host_request:
325 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
327 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
328 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
330 if request.status == conversations_pb2.HOST_REQUEST_STATUS_PENDING:
331 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
333 if host_request.end_time < now():
334 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_IN_PAST)
336 control_message = Message()
338 if request.status == conversations_pb2.HOST_REQUEST_STATUS_ACCEPTED:
339 # only host can accept
340 if context.user_id != host_request.host_user_id:
341 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.NOT_THE_HOST)
342 # can't accept a cancelled or confirmed request (only reject), or already accepted
343 if (
344 host_request.status == HostRequestStatus.cancelled
345 or host_request.status == HostRequestStatus.confirmed
346 or host_request.status == HostRequestStatus.accepted
347 ):
348 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
349 _possibly_observe_first_response_time(session, host_request, context.user_id, "accepted")
350 control_message.host_request_status_target = HostRequestStatus.accepted
351 host_request.status = HostRequestStatus.accepted
352 session.flush()
354 notify(
355 session,
356 user_id=host_request.surfer_user_id,
357 topic_action="host_request:accept",
358 key=host_request.conversation_id,
359 data=notification_data_pb2.HostRequestAccept(
360 host_request=host_request_to_pb(host_request, session, context),
361 host=user_model_to_pb(host_request.host, session, context),
362 ),
363 )
365 count_host_response(host_request.surfer_user_id, "accepted")
367 if request.status == conversations_pb2.HOST_REQUEST_STATUS_REJECTED:
368 # only host can reject
369 if context.user_id != host_request.host_user_id:
370 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
371 # can't reject a cancelled or already rejected request
372 if host_request.status == HostRequestStatus.cancelled or host_request.status == HostRequestStatus.rejected:
373 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
374 _possibly_observe_first_response_time(session, host_request, context.user_id, "rejected")
375 control_message.host_request_status_target = HostRequestStatus.rejected
376 host_request.status = HostRequestStatus.rejected
377 session.flush()
379 notify(
380 session,
381 user_id=host_request.surfer_user_id,
382 topic_action="host_request:reject",
383 key=host_request.conversation_id,
384 data=notification_data_pb2.HostRequestReject(
385 host_request=host_request_to_pb(host_request, session, context),
386 host=user_model_to_pb(host_request.host, session, context),
387 ),
388 )
390 count_host_response(host_request.surfer_user_id, "rejected")
392 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CONFIRMED:
393 # only surfer can confirm
394 if context.user_id != host_request.surfer_user_id:
395 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
396 # can only confirm an accepted request
397 if host_request.status != HostRequestStatus.accepted:
398 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
399 control_message.host_request_status_target = HostRequestStatus.confirmed
400 host_request.status = HostRequestStatus.confirmed
401 session.flush()
403 notify(
404 session,
405 user_id=host_request.host_user_id,
406 topic_action="host_request:confirm",
407 key=host_request.conversation_id,
408 data=notification_data_pb2.HostRequestConfirm(
409 host_request=host_request_to_pb(host_request, session, context),
410 surfer=user_model_to_pb(host_request.surfer, session, context),
411 ),
412 )
414 count_host_response(host_request.host_user_id, "confirmed")
416 if request.status == conversations_pb2.HOST_REQUEST_STATUS_CANCELLED:
417 # only surfer can cancel
418 if context.user_id != host_request.surfer_user_id:
419 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
420 # can't' cancel an already cancelled or rejected request
421 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled:
422 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.INVALID_HOST_REQUEST_STATUS)
423 control_message.host_request_status_target = HostRequestStatus.cancelled
424 host_request.status = HostRequestStatus.cancelled
425 session.flush()
427 notify(
428 session,
429 user_id=host_request.host_user_id,
430 topic_action="host_request:cancel",
431 key=host_request.conversation_id,
432 data=notification_data_pb2.HostRequestCancel(
433 host_request=host_request_to_pb(host_request, session, context),
434 surfer=user_model_to_pb(host_request.surfer, session, context),
435 ),
436 )
438 count_host_response(host_request.host_user_id, "cancelled")
440 control_message.message_type = MessageType.host_request_status_changed
441 control_message.conversation_id = host_request.conversation_id
442 control_message.author_id = context.user_id
443 session.add(control_message)
445 if request.text:
446 latest_message = Message()
447 latest_message.conversation_id = host_request.conversation_id
448 latest_message.text = request.text
449 latest_message.author_id = context.user_id
450 latest_message.message_type = MessageType.text
451 session.add(latest_message)
452 else:
453 latest_message = control_message
455 session.flush()
457 if host_request.surfer_user_id == context.user_id:
458 host_request.surfer_last_seen_message_id = latest_message.id
459 else:
460 host_request.host_last_seen_message_id = latest_message.id
461 session.commit()
463 return empty_pb2.Empty()
465 def GetHostRequestMessages(self, request, context, session):
466 host_request = session.execute(
467 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id)
468 ).scalar_one_or_none()
470 if not host_request:
471 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
473 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
474 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
476 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
477 pagination = min(pagination, MAX_PAGE_SIZE)
479 messages = (
480 session.execute(
481 select(Message)
482 .where(Message.conversation_id == host_request.conversation_id)
483 .where(or_(Message.id < request.last_message_id, request.last_message_id == 0))
484 .order_by(Message.id.desc())
485 .limit(pagination + 1)
486 )
487 .scalars()
488 .all()
489 )
491 no_more = len(messages) <= pagination
493 last_message_id = min(m.id if m else 1 for m in messages[:pagination]) if len(messages) > 0 else 0
495 return requests_pb2.GetHostRequestMessagesRes(
496 last_message_id=last_message_id,
497 no_more=no_more,
498 messages=[message_to_pb(message) for message in messages[:pagination]],
499 )
501 def SendHostRequestMessage(self, request, context, session):
502 if request.text == "":
503 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE)
504 host_request = session.execute(
505 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id)
506 ).scalar_one_or_none()
508 if not host_request:
509 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
511 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
512 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
514 if host_request.status == HostRequestStatus.rejected or host_request.status == HostRequestStatus.cancelled:
515 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.HOST_REQUEST_CLOSED)
517 if host_request.end_time < now():
518 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_IN_PAST)
520 if host_request.host_user_id == context.user_id:
521 _possibly_observe_first_response_time(session, host_request, context.user_id, "message")
523 message = Message()
524 message.conversation_id = host_request.conversation_id
525 message.author_id = context.user_id
526 message.message_type = MessageType.text
527 message.text = request.text
528 session.add(message)
529 session.flush()
531 if host_request.surfer_user_id == context.user_id:
532 host_request.surfer_last_seen_message_id = message.id
534 notify(
535 session,
536 user_id=host_request.host_user_id,
537 topic_action="host_request:message",
538 key=host_request.conversation_id,
539 data=notification_data_pb2.HostRequestMessage(
540 host_request=host_request_to_pb(host_request, session, context),
541 user=user_model_to_pb(host_request.surfer, session, context),
542 text=request.text,
543 am_host=True,
544 ),
545 )
547 else:
548 host_request.host_last_seen_message_id = message.id
550 notify(
551 session,
552 user_id=host_request.surfer_user_id,
553 topic_action="host_request:message",
554 key=host_request.conversation_id,
555 data=notification_data_pb2.HostRequestMessage(
556 host_request=host_request_to_pb(host_request, session, context),
557 user=user_model_to_pb(host_request.host, session, context),
558 text=request.text,
559 am_host=False,
560 ),
561 )
563 session.commit()
565 user_gender = session.execute(select(User.gender).where(User.id == context.user_id)).scalar_one()
566 sent_messages_counter.labels(user_gender, "host request").inc()
568 return empty_pb2.Empty()
570 def GetHostRequestUpdates(self, request, context, session):
571 if request.only_sent and request.only_received:
572 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.HOST_REQUEST_SENT_OR_RECEIVED)
574 if request.newest_message_id == 0:
575 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE)
577 if not session.execute(select(Message).where(Message.id == request.newest_message_id)).scalar_one_or_none():
578 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_MESSAGE)
580 pagination = request.number if request.number > 0 else DEFAULT_PAGINATION_LENGTH
581 pagination = min(pagination, MAX_PAGE_SIZE)
583 statement = (
584 select(
585 Message,
586 HostRequest.status.label("host_request_status"),
587 HostRequest.conversation_id.label("host_request_id"),
588 )
589 .join(HostRequest, HostRequest.conversation_id == Message.conversation_id)
590 .where(Message.id > request.newest_message_id)
591 )
593 if request.only_sent:
594 statement = statement.where(HostRequest.surfer_user_id == context.user_id)
595 elif request.only_received:
596 statement = statement.where(HostRequest.host_user_id == context.user_id)
597 else:
598 statement = statement.where(
599 or_(HostRequest.host_user_id == context.user_id, HostRequest.surfer_user_id == context.user_id)
600 )
602 statement = statement.order_by(Message.id.asc()).limit(pagination + 1)
603 res = session.execute(statement).all()
605 no_more = len(res) <= pagination
607 last_message_id = min(m.Message.id if m else 1 for m in res[:pagination]) if len(res) > 0 else 0 # TODO
609 return requests_pb2.GetHostRequestUpdatesRes(
610 no_more=no_more,
611 updates=[
612 requests_pb2.HostRequestUpdate(
613 host_request_id=result.host_request_id,
614 status=hostrequeststatus2api[result.host_request_status],
615 message=message_to_pb(result.Message),
616 )
617 for result in res[:pagination]
618 ],
619 )
621 def MarkLastSeenHostRequest(self, request, context, session):
622 host_request = session.execute(
623 select(HostRequest).where(HostRequest.conversation_id == request.host_request_id)
624 ).scalar_one_or_none()
626 if not host_request:
627 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
629 if host_request.surfer_user_id != context.user_id and host_request.host_user_id != context.user_id:
630 context.abort(grpc.StatusCode.NOT_FOUND, errors.HOST_REQUEST_NOT_FOUND)
632 if host_request.surfer_user_id == context.user_id:
633 if not host_request.surfer_last_seen_message_id <= request.last_seen_message_id:
634 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES)
635 host_request.surfer_last_seen_message_id = request.last_seen_message_id
636 else:
637 if not host_request.host_last_seen_message_id <= request.last_seen_message_id:
638 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANT_UNSEE_MESSAGES)
639 host_request.host_last_seen_message_id = request.last_seen_message_id
641 session.commit()
642 return empty_pb2.Empty()
644 def GetResponseRate(self, request, context, session):
645 # this subquery gets the time that the request was sent
646 t = (
647 select(Message.conversation_id, Message.time)
648 .where(Message.message_type == MessageType.chat_created)
649 .subquery()
650 )
651 # this subquery gets the time that the user responded to the request
652 s = (
653 select(Message.conversation_id, func.min(Message.time).label("time"))
654 .where(Message.author_id == request.user_id)
655 .group_by(Message.conversation_id)
656 .subquery()
657 )
659 res = session.execute(
660 select(
661 User.id,
662 # number of requests received
663 func.count().label("n"),
664 # percentage of requests responded to
665 (func.count(s.c.time) / func.cast(func.greatest(func.count(t.c.time), 1.0), Float)).label(
666 "response_rate"
667 ),
668 # the 33rd percentile response time
669 percentile_disc(0.33)
670 .within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000)))
671 .label("response_time_p33"),
672 # the 66th percentile response time
673 percentile_disc(0.66)
674 .within_group(func.coalesce(s.c.time - t.c.time, timedelta(days=1000)))
675 .label("response_time_p66"),
676 )
677 .where_users_visible(context)
678 .where(User.id == request.user_id)
679 .outerjoin(HostRequest, HostRequest.host_user_id == User.id)
680 .outerjoin(t, t.c.conversation_id == HostRequest.conversation_id)
681 .outerjoin(s, s.c.conversation_id == HostRequest.conversation_id)
682 .group_by(User.id)
683 ).one_or_none()
685 if not res:
686 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
688 _, n, response_rate, response_time_p33, response_time_p66 = res
690 if n < 3:
691 return requests_pb2.GetResponseRateRes(
692 insufficient_data=requests_pb2.ResponseRateInsufficientData(),
693 )
695 if response_rate <= 0.33:
696 return requests_pb2.GetResponseRateRes(
697 low=requests_pb2.ResponseRateLow(),
698 )
700 response_time_p33_coarsened = Duration_from_timedelta(
701 timedelta(seconds=round(response_time_p33.total_seconds() / 60) * 60)
702 )
704 if response_rate <= 0.66:
705 return requests_pb2.GetResponseRateRes(
706 some=requests_pb2.ResponseRateSome(response_time_p33=response_time_p33_coarsened),
707 )
709 response_time_p66_coarsened = Duration_from_timedelta(
710 timedelta(seconds=round(response_time_p66.total_seconds() / 60) * 60)
711 )
713 if response_rate <= 0.90:
714 return requests_pb2.GetResponseRateRes(
715 most=requests_pb2.ResponseRateMost(
716 response_time_p33=response_time_p33_coarsened, response_time_p66=response_time_p66_coarsened
717 ),
718 )
719 else:
720 return requests_pb2.GetResponseRateRes(
721 almost_all=requests_pb2.ResponseRateAlmostAll(
722 response_time_p33=response_time_p33_coarsened, response_time_p66=response_time_p66_coarsened
723 ),
724 )