Coverage for src/couchers/servicers/search.py: 84%
247 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-04-16 15:13 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-04-16 15:13 +0000
1"""
2See //docs/search.md for overview.
3"""
5from datetime import timedelta
7import grpc
8from sqlalchemy.sql import and_, func, or_
10from couchers import errors
11from couchers.crypto import decrypt_page_token, encrypt_page_token
12from couchers.models import (
13 Cluster,
14 ClusterSubscription,
15 Event,
16 EventOccurrence,
17 EventOccurrenceAttendee,
18 EventOrganizer,
19 EventSubscription,
20 LanguageAbility,
21 Node,
22 Page,
23 PageType,
24 PageVersion,
25 Reference,
26 StrongVerificationAttempt,
27 User,
28)
29from couchers.servicers.account import has_strong_verification
30from couchers.servicers.api import (
31 fluency2sql,
32 hostingstatus2sql,
33 meetupstatus2sql,
34 parkingdetails2sql,
35 sleepingarrangement2sql,
36 smokinglocation2sql,
37 user_model_to_pb,
38)
39from couchers.servicers.communities import community_to_pb
40from couchers.servicers.events import event_to_pb
41from couchers.servicers.groups import group_to_pb
42from couchers.servicers.pages import page_to_pb
43from couchers.sql import couchers_select as select
44from couchers.utils import (
45 create_coordinate,
46 dt_from_millis,
47 last_active_coarsen,
48 millis_from_dt,
49 now,
50 to_aware_datetime,
51)
52from proto import search_pb2, search_pb2_grpc
54# searches are a bit expensive, we'd rather send back a bunch of results at once than lots of small pages
55MAX_PAGINATION_LENGTH = 100
57REGCONFIG = "english"
58TRI_SIMILARITY_THRESHOLD = 0.6
59TRI_SIMILARITY_WEIGHT = 5
62def _join_with_space(coalesces):
63 # the objects in coalesces are not strings, so we can't do " ".join(coalesces). They're SQLAlchemy magic.
64 if not coalesces:
65 return ""
66 out = coalesces[0]
67 for coalesce in coalesces[1:]:
68 out += " " + coalesce
69 return out
72def _build_tsv(A, B=None, C=None, D=None):
73 """
74 Given lists for A, B, C, and D, builds a tsvector from them.
75 """
76 B = B or []
77 C = C or []
78 D = D or []
79 tsv = func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in A])), "A")
80 if B:
81 tsv = tsv.concat(
82 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in B])), "B")
83 )
84 if C:
85 tsv = tsv.concat(
86 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in C])), "C")
87 )
88 if D:
89 tsv = tsv.concat(
90 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in D])), "D")
91 )
92 return tsv
95def _build_doc(A, B=None, C=None, D=None):
96 """
97 Builds the raw document (without to_tsvector and weighting), used for extracting snippet
98 """
99 B = B or []
100 C = C or []
101 D = D or []
102 doc = _join_with_space([func.coalesce(bit, "") for bit in A])
103 if B:
104 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in B])
105 if C:
106 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in C])
107 if D:
108 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in D])
109 return doc
112def _similarity(statement, text):
113 return func.word_similarity(func.unaccent(statement), func.unaccent(text))
116def _gen_search_elements(statement, title_only, next_rank, page_size, A, B=None, C=None, D=None):
117 """
118 Given an sql statement and four sets of fields, (A, B, C, D), generates a bunch of postgres expressions for full text search.
120 The four sets are in decreasing order of "importance" for ranking.
122 A should be the "title", the others can be anything.
124 If title_only=True, we only perform a trigram search against A only
125 """
126 B = B or []
127 C = C or []
128 D = D or []
129 if not title_only:
130 # a postgres tsquery object that can be used to match against a tsvector
131 tsq = func.websearch_to_tsquery(REGCONFIG, statement)
133 # the tsvector object that we want to search against with our tsquery
134 tsv = _build_tsv(A, B, C, D)
136 # document to generate snippet from
137 doc = _build_doc(A, B, C, D)
139 title = _build_doc(A)
141 # trigram based text similarity between title and sql statement string
142 sim = _similarity(statement, title)
144 # ranking algo, weigh the similarity a lot, the text-based ranking less
145 rank = (TRI_SIMILARITY_WEIGHT * sim + func.ts_rank_cd(tsv, tsq)).label("rank")
147 # the snippet with results highlighted
148 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet")
150 def execute_search_statement(session, orig_statement):
151 """
152 Does the right search filtering, limiting, and ordering for the initial statement
153 """
154 return session.execute(
155 orig_statement.where(or_(tsv.op("@@")(tsq), sim > TRI_SIMILARITY_THRESHOLD))
156 .where(rank <= next_rank if next_rank is not None else True)
157 .order_by(rank.desc())
158 .limit(page_size + 1)
159 ).all()
161 else:
162 title = _build_doc(A)
164 # trigram based text similarity between title and sql statement string
165 sim = _similarity(statement, title)
167 # ranking algo, weigh the similarity a lot, the text-based ranking less
168 rank = sim.label("rank")
170 # used only for headline
171 tsq = func.websearch_to_tsquery(REGCONFIG, statement)
172 doc = _build_doc(A, B, C, D)
174 # the snippet with results highlighted
175 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet")
177 def execute_search_statement(session, orig_statement):
178 """
179 Does the right search filtering, limiting, and ordering for the initial statement
180 """
181 return session.execute(
182 orig_statement.where(sim > TRI_SIMILARITY_THRESHOLD)
183 .where(rank <= next_rank if next_rank is not None else True)
184 .order_by(rank.desc())
185 .limit(page_size + 1)
186 ).all()
188 return rank, snippet, execute_search_statement
191def _search_users(session, search_statement, title_only, next_rank, page_size, context, include_users):
192 if not include_users:
193 return []
194 rank, snippet, execute_search_statement = _gen_search_elements(
195 search_statement,
196 title_only,
197 next_rank,
198 page_size,
199 [User.username, User.name],
200 [User.city],
201 [User.about_me],
202 [User.things_i_like, User.about_place, User.additional_information],
203 )
205 users = execute_search_statement(session, select(User, rank, snippet).where_users_visible(context))
207 return [
208 search_pb2.Result(
209 rank=rank,
210 user=user_model_to_pb(page, session, context),
211 snippet=snippet,
212 )
213 for page, rank, snippet in users
214 ]
217def _search_pages(session, search_statement, title_only, next_rank, page_size, context, include_places, include_guides):
218 rank, snippet, execute_search_statement = _gen_search_elements(
219 search_statement,
220 title_only,
221 next_rank,
222 page_size,
223 [PageVersion.title],
224 [PageVersion.address],
225 [],
226 [PageVersion.content],
227 )
228 if not include_places and not include_guides:
229 return []
231 latest_pages = (
232 select(func.max(PageVersion.id).label("id"))
233 .join(Page, Page.id == PageVersion.page_id)
234 .where(
235 or_(
236 (Page.type == PageType.place) if include_places else False,
237 (Page.type == PageType.guide) if include_guides else False,
238 )
239 )
240 .group_by(PageVersion.page_id)
241 .subquery()
242 )
244 pages = execute_search_statement(
245 session,
246 select(Page, rank, snippet)
247 .join(PageVersion, PageVersion.page_id == Page.id)
248 .join(latest_pages, latest_pages.c.id == PageVersion.id),
249 )
251 return [
252 search_pb2.Result(
253 rank=rank,
254 place=page_to_pb(session, page, context) if page.type == PageType.place else None,
255 guide=page_to_pb(session, page, context) if page.type == PageType.guide else None,
256 snippet=snippet,
257 )
258 for page, rank, snippet in pages
259 ]
262def _search_events(session, search_statement, title_only, next_rank, page_size, context):
263 rank, snippet, execute_search_statement = _gen_search_elements(
264 search_statement,
265 title_only,
266 next_rank,
267 page_size,
268 [Event.title],
269 [EventOccurrence.address, EventOccurrence.link],
270 [],
271 [EventOccurrence.content],
272 )
274 occurrences = execute_search_statement(
275 session,
276 select(EventOccurrence, rank, snippet)
277 .join(Event, Event.id == EventOccurrence.event_id)
278 .where(EventOccurrence.end_time >= func.now()),
279 )
281 return [
282 search_pb2.Result(
283 rank=rank,
284 event=event_to_pb(session, occurrence, context),
285 snippet=snippet,
286 )
287 for occurrence, rank, snippet in occurrences
288 ]
291def _search_clusters(
292 session, search_statement, title_only, next_rank, page_size, context, include_communities, include_groups
293):
294 if not include_communities and not include_groups:
295 return []
297 rank, snippet, execute_search_statement = _gen_search_elements(
298 search_statement,
299 title_only,
300 next_rank,
301 page_size,
302 [Cluster.name],
303 [PageVersion.address, PageVersion.title],
304 [Cluster.description],
305 [PageVersion.content],
306 )
308 latest_pages = (
309 select(func.max(PageVersion.id).label("id"))
310 .join(Page, Page.id == PageVersion.page_id)
311 .where(Page.type == PageType.main_page)
312 .group_by(PageVersion.page_id)
313 .subquery()
314 )
316 clusters = execute_search_statement(
317 session,
318 select(Cluster, rank, snippet)
319 .join(Page, Page.owner_cluster_id == Cluster.id)
320 .join(PageVersion, PageVersion.page_id == Page.id)
321 .join(latest_pages, latest_pages.c.id == PageVersion.id)
322 .where(Cluster.is_official_cluster if include_communities and not include_groups else True)
323 .where(~Cluster.is_official_cluster if not include_communities and include_groups else True),
324 )
326 return [
327 search_pb2.Result(
328 rank=rank,
329 community=(
330 community_to_pb(session, cluster.official_cluster_for_node, context)
331 if cluster.is_official_cluster
332 else None
333 ),
334 group=group_to_pb(session, cluster, context) if not cluster.is_official_cluster else None,
335 snippet=snippet,
336 )
337 for cluster, rank, snippet in clusters
338 ]
341class Search(search_pb2_grpc.SearchServicer):
342 def Search(self, request, context, session):
343 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
344 # this is not an ideal page token, some results have equal rank (unlikely)
345 next_rank = float(request.page_token) if request.page_token else None
347 all_results = (
348 _search_users(
349 session,
350 request.query,
351 request.title_only,
352 next_rank,
353 page_size,
354 context,
355 request.include_users,
356 )
357 + _search_pages(
358 session,
359 request.query,
360 request.title_only,
361 next_rank,
362 page_size,
363 context,
364 request.include_places,
365 request.include_guides,
366 )
367 + _search_events(
368 session,
369 request.query,
370 request.title_only,
371 next_rank,
372 page_size,
373 context,
374 )
375 + _search_clusters(
376 session,
377 request.query,
378 request.title_only,
379 next_rank,
380 page_size,
381 context,
382 request.include_communities,
383 request.include_groups,
384 )
385 )
386 all_results.sort(key=lambda result: result.rank, reverse=True)
387 return search_pb2.SearchRes(
388 results=all_results[:page_size],
389 next_page_token=str(all_results[page_size].rank) if len(all_results) > page_size else None,
390 )
392 def UserSearch(self, request, context, session):
393 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
395 # Base statement with visibility filter
396 statement = select(User).where_users_visible(context)
398 # If exactly_user_ids is present, only filter by those IDs and ignore all other filters
399 # This is a bit of a hacky feature to help with the frontend map implementation
400 if len(request.exactly_user_ids) > 0:
401 statement = statement.where(User.id.in_(request.exactly_user_ids))
402 else:
403 # Apply all the normal filters
404 if request.HasField("query"):
405 if request.query_name_only:
406 statement = statement.where(
407 or_(
408 User.name.ilike(f"%{request.query.value}%"), User.username.ilike(f"%{request.query.value}%")
409 )
410 )
411 else:
412 statement = statement.where(
413 or_(
414 User.name.ilike(f"%{request.query.value}%"),
415 User.username.ilike(f"%{request.query.value}%"),
416 User.city.ilike(f"%{request.query.value}%"),
417 User.hometown.ilike(f"%{request.query.value}%"),
418 User.about_me.ilike(f"%{request.query.value}%"),
419 User.things_i_like.ilike(f"%{request.query.value}%"),
420 User.about_place.ilike(f"%{request.query.value}%"),
421 User.additional_information.ilike(f"%{request.query.value}%"),
422 )
423 )
425 if request.HasField("last_active"):
426 raw_dt = to_aware_datetime(request.last_active)
427 statement = statement.where(User.last_active >= last_active_coarsen(raw_dt))
429 if len(request.gender) > 0:
430 if not has_strong_verification(session, user):
431 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NEED_STRONG_VERIFICATION)
432 elif user.gender not in request.gender:
433 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_INCLUDE_OWN_GENDER)
434 else:
435 statement = statement.where(User.gender.in_(request.gender))
437 if len(request.hosting_status_filter) > 0:
438 statement = statement.where(
439 User.hosting_status.in_([hostingstatus2sql[status] for status in request.hosting_status_filter])
440 )
441 if len(request.meetup_status_filter) > 0:
442 statement = statement.where(
443 User.meetup_status.in_([meetupstatus2sql[status] for status in request.meetup_status_filter])
444 )
445 if len(request.smoking_location_filter) > 0:
446 statement = statement.where(
447 User.smoking_allowed.in_([smokinglocation2sql[loc] for loc in request.smoking_location_filter])
448 )
449 if len(request.sleeping_arrangement_filter) > 0:
450 statement = statement.where(
451 User.sleeping_arrangement.in_(
452 [sleepingarrangement2sql[arr] for arr in request.sleeping_arrangement_filter]
453 )
454 )
455 if len(request.parking_details_filter) > 0:
456 statement = statement.where(
457 User.parking_details.in_([parkingdetails2sql[det] for det in request.parking_details_filter])
458 )
459 # limits/default could be handled on the front end as well
460 min_age = request.age_min.value if request.HasField("age_min") else 18
461 max_age = request.age_max.value if request.HasField("age_max") else 200
463 statement = statement.where((User.age >= min_age) & (User.age <= max_age))
465 # return results with by language code as only input
466 # fluency in conversational or fluent
468 if len(request.language_ability_filter) > 0:
469 language_options = []
470 for ability_filter in request.language_ability_filter:
471 fluency_sql_value = fluency2sql.get(ability_filter.fluency)
473 if fluency_sql_value is None:
474 continue
475 language_options.append(
476 and_(
477 (LanguageAbility.language_code == ability_filter.code),
478 (LanguageAbility.fluency >= (fluency_sql_value)),
479 )
480 )
481 statement = statement.join(LanguageAbility, LanguageAbility.user_id == User.id)
482 statement = statement.where(or_(*language_options))
484 if request.HasField("profile_completed"):
485 statement = statement.where(User.has_completed_profile == request.profile_completed.value)
486 if request.HasField("guests"):
487 statement = statement.where(User.max_guests >= request.guests.value)
488 if request.HasField("last_minute"):
489 statement = statement.where(User.last_minute == request.last_minute.value)
490 if request.HasField("has_pets"):
491 statement = statement.where(User.has_pets == request.has_pets.value)
492 if request.HasField("accepts_pets"):
493 statement = statement.where(User.accepts_pets == request.accepts_pets.value)
494 if request.HasField("has_kids"):
495 statement = statement.where(User.has_kids == request.has_kids.value)
496 if request.HasField("accepts_kids"):
497 statement = statement.where(User.accepts_kids == request.accepts_kids.value)
498 if request.HasField("has_housemates"):
499 statement = statement.where(User.has_housemates == request.has_housemates.value)
500 if request.HasField("wheelchair_accessible"):
501 statement = statement.where(User.wheelchair_accessible == request.wheelchair_accessible.value)
502 if request.HasField("smokes_at_home"):
503 statement = statement.where(User.smokes_at_home == request.smokes_at_home.value)
504 if request.HasField("drinking_allowed"):
505 statement = statement.where(User.drinking_allowed == request.drinking_allowed.value)
506 if request.HasField("drinks_at_home"):
507 statement = statement.where(User.drinks_at_home == request.drinks_at_home.value)
508 if request.HasField("parking"):
509 statement = statement.where(User.parking == request.parking.value)
510 if request.HasField("camping_ok"):
511 statement = statement.where(User.camping_ok == request.camping_ok.value)
513 if request.HasField("search_in_area"):
514 # EPSG4326 measures distance in decimal degress
515 # we want to check whether two circles overlap, so check if the distance between their centers is less
516 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator)
517 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng)
518 statement = statement.where(
519 func.ST_DWithin(
520 # old:
521 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111
522 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius
523 User.geom,
524 search_point,
525 (1000 + request.search_in_area.radius) / 111111,
526 )
527 )
528 if request.HasField("search_in_rectangle"):
529 statement = statement.where(
530 func.ST_Within(
531 User.geom,
532 func.ST_MakeEnvelope(
533 request.search_in_rectangle.lng_min,
534 request.search_in_rectangle.lat_min,
535 request.search_in_rectangle.lng_max,
536 request.search_in_rectangle.lat_max,
537 4326,
538 ),
539 )
540 )
541 if request.HasField("search_in_community_id"):
542 # could do a join here as well, but this is just simpler
543 node = session.execute(
544 select(Node).where(Node.id == request.search_in_community_id)
545 ).scalar_one_or_none()
546 if not node:
547 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
548 statement = statement.where(func.ST_Contains(node.geom, User.geom))
550 if request.only_with_references:
551 references = (
552 select(Reference.to_user_id.label("user_id"))
553 .where_users_column_visible(context, Reference.from_user_id)
554 .distinct()
555 .subquery()
556 )
557 statement = statement.join(references, references.c.user_id == User.id)
559 if request.only_with_strong_verification:
560 statement = statement.join(
561 StrongVerificationAttempt,
562 and_(
563 StrongVerificationAttempt.user_id == User.id,
564 StrongVerificationAttempt.has_strong_verification(User),
565 ),
566 )
567 # TODO:
568 # bool friends_only = 13;
570 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
571 next_recommendation_score = float(decrypt_page_token(request.page_token)) if request.page_token else 1e10
572 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar()
574 statement = (
575 statement.where(User.recommendation_score <= next_recommendation_score)
576 .order_by(User.recommendation_score.desc())
577 .limit(page_size + 1)
578 )
579 users = session.execute(statement).scalars().all()
581 return search_pb2.UserSearchRes(
582 results=[
583 search_pb2.Result(
584 rank=1,
585 user=user_model_to_pb(user, session, context),
586 )
587 for user in users[:page_size]
588 ],
589 next_page_token=(
590 encrypt_page_token(str(users[-1].recommendation_score)) if len(users) > page_size else None
591 ),
592 total_items=total_items,
593 )
595 def EventSearch(self, request, context, session):
596 statement = (
597 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted)
598 )
600 if request.HasField("query"):
601 if request.query_title_only:
602 statement = statement.where(Event.title.ilike(f"%{request.query.value}%"))
603 else:
604 statement = statement.where(
605 or_(
606 Event.title.ilike(f"%{request.query.value}%"),
607 EventOccurrence.content.ilike(f"%{request.query.value}%"),
608 EventOccurrence.address.ilike(f"%{request.query.value}%"),
609 )
610 )
612 if request.only_online:
613 statement = statement.where(EventOccurrence.geom == None)
614 elif request.only_offline:
615 statement = statement.where(EventOccurrence.geom != None)
617 if request.subscribed or request.attending or request.organizing or request.my_communities:
618 where_ = []
620 if request.subscribed:
621 statement = statement.outerjoin(
622 EventSubscription,
623 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id),
624 )
625 where_.append(EventSubscription.user_id != None)
626 if request.organizing:
627 statement = statement.outerjoin(
628 EventOrganizer,
629 and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id),
630 )
631 where_.append(EventOrganizer.user_id != None)
632 if request.attending:
633 statement = statement.outerjoin(
634 EventOccurrenceAttendee,
635 and_(
636 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id,
637 EventOccurrenceAttendee.user_id == context.user_id,
638 ),
639 )
640 where_.append(EventOccurrenceAttendee.user_id != None)
641 if request.my_communities:
642 my_communities = (
643 session.execute(
644 select(Node.id)
645 .join(Cluster, Cluster.parent_node_id == Node.id)
646 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
647 .where(ClusterSubscription.user_id == context.user_id)
648 .where(Cluster.is_official_cluster)
649 .order_by(Node.id)
650 .limit(100000)
651 )
652 .scalars()
653 .all()
654 )
655 where_.append(Event.parent_node_id.in_(my_communities))
657 statement = statement.where(or_(*where_))
659 if not request.include_cancelled:
660 statement = statement.where(~EventOccurrence.is_cancelled)
662 if request.HasField("search_in_area"):
663 # EPSG4326 measures distance in decimal degress
664 # we want to check whether two circles overlap, so check if the distance between their centers is less
665 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator)
666 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng)
667 statement = statement.where(
668 func.ST_DWithin(
669 # old:
670 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111
671 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius
672 EventOccurrence.geom,
673 search_point,
674 (1000 + request.search_in_area.radius) / 111111,
675 )
676 )
677 if request.HasField("search_in_rectangle"):
678 statement = statement.where(
679 func.ST_Within(
680 EventOccurrence.geom,
681 func.ST_MakeEnvelope(
682 request.search_in_rectangle.lng_min,
683 request.search_in_rectangle.lat_min,
684 request.search_in_rectangle.lng_max,
685 request.search_in_rectangle.lat_max,
686 4326,
687 ),
688 )
689 )
690 if request.HasField("search_in_community_id"):
691 # could do a join here as well, but this is just simpler
692 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none()
693 if not node:
694 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
695 statement = statement.where(func.ST_Contains(node.geom, EventOccurrence.geom))
697 if request.HasField("after"):
698 statement = statement.where(EventOccurrence.start_time > to_aware_datetime(request.after))
699 if request.HasField("before"):
700 statement = statement.where(EventOccurrence.end_time < to_aware_datetime(request.before))
702 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
703 # the page token is a unix timestamp of where we left off
704 page_token = (
705 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now()
706 )
707 page_number = request.page_number or 1
708 # Calculate the offset for pagination
709 offset = (page_number - 1) * page_size
711 if not request.past:
712 statement = statement.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
713 EventOccurrence.start_time.asc()
714 )
715 else:
716 statement = statement.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
717 EventOccurrence.start_time.desc()
718 )
720 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar()
721 # Apply pagination by page number
722 statement = statement.offset(offset).limit(page_size) if request.page_number else statement.limit(page_size + 1)
723 occurrences = session.execute(statement).scalars().all()
725 return search_pb2.EventSearchRes(
726 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
727 next_page_token=(str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None),
728 total_items=total_items,
729 )