Coverage for src/couchers/servicers/search.py: 83%
247 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-11 15:27 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-11 15:27 +0000
1"""
2See //docs/search.md for overview.
3"""
5from datetime import timedelta
7import grpc
8from sqlalchemy.sql import and_, func, or_
10from couchers import errors
11from couchers.crypto import decrypt_page_token, encrypt_page_token
12from couchers.models import (
13 Cluster,
14 ClusterSubscription,
15 Event,
16 EventOccurrence,
17 EventOccurrenceAttendee,
18 EventOrganizer,
19 EventSubscription,
20 LanguageAbility,
21 Node,
22 Page,
23 PageType,
24 PageVersion,
25 Reference,
26 StrongVerificationAttempt,
27 User,
28)
29from couchers.servicers.account import has_strong_verification
30from couchers.servicers.api import (
31 fluency2sql,
32 hostingstatus2sql,
33 meetupstatus2sql,
34 parkingdetails2sql,
35 sleepingarrangement2sql,
36 smokinglocation2sql,
37 user_model_to_pb,
38)
39from couchers.servicers.communities import community_to_pb
40from couchers.servicers.events import event_to_pb
41from couchers.servicers.groups import group_to_pb
42from couchers.servicers.pages import page_to_pb
43from couchers.sql import couchers_select as select
44from couchers.utils import (
45 create_coordinate,
46 dt_from_millis,
47 last_active_coarsen,
48 millis_from_dt,
49 now,
50 to_aware_datetime,
51)
52from proto import search_pb2, search_pb2_grpc
54# searches are a bit expensive, we'd rather send back a bunch of results at once than lots of small pages
55MAX_PAGINATION_LENGTH = 100
57REGCONFIG = "english"
58TRI_SIMILARITY_THRESHOLD = 0.6
59TRI_SIMILARITY_WEIGHT = 5
62def _join_with_space(coalesces):
63 # the objects in coalesces are not strings, so we can't do " ".join(coalesces). They're SQLAlchemy magic.
64 if not coalesces:
65 return ""
66 out = coalesces[0]
67 for coalesce in coalesces[1:]:
68 out += " " + coalesce
69 return out
72def _build_tsv(A, B=None, C=None, D=None):
73 """
74 Given lists for A, B, C, and D, builds a tsvector from them.
75 """
76 B = B or []
77 C = C or []
78 D = D or []
79 tsv = func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in A])), "A")
80 if B:
81 tsv = tsv.concat(
82 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in B])), "B")
83 )
84 if C:
85 tsv = tsv.concat(
86 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in C])), "C")
87 )
88 if D:
89 tsv = tsv.concat(
90 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in D])), "D")
91 )
92 return tsv
95def _build_doc(A, B=None, C=None, D=None):
96 """
97 Builds the raw document (without to_tsvector and weighting), used for extracting snippet
98 """
99 B = B or []
100 C = C or []
101 D = D or []
102 doc = _join_with_space([func.coalesce(bit, "") for bit in A])
103 if B:
104 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in B])
105 if C:
106 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in C])
107 if D:
108 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in D])
109 return doc
112def _similarity(statement, text):
113 return func.word_similarity(func.unaccent(statement), func.unaccent(text))
116def _gen_search_elements(statement, title_only, next_rank, page_size, A, B=None, C=None, D=None):
117 """
118 Given an sql statement and four sets of fields, (A, B, C, D), generates a bunch of postgres expressions for full text search.
120 The four sets are in decreasing order of "importance" for ranking.
122 A should be the "title", the others can be anything.
124 If title_only=True, we only perform a trigram search against A only
125 """
126 B = B or []
127 C = C or []
128 D = D or []
129 if not title_only:
130 # a postgres tsquery object that can be used to match against a tsvector
131 tsq = func.websearch_to_tsquery(REGCONFIG, statement)
133 # the tsvector object that we want to search against with our tsquery
134 tsv = _build_tsv(A, B, C, D)
136 # document to generate snippet from
137 doc = _build_doc(A, B, C, D)
139 title = _build_doc(A)
141 # trigram based text similarity between title and sql statement string
142 sim = _similarity(statement, title)
144 # ranking algo, weigh the similarity a lot, the text-based ranking less
145 rank = (TRI_SIMILARITY_WEIGHT * sim + func.ts_rank_cd(tsv, tsq)).label("rank")
147 # the snippet with results highlighted
148 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet")
150 def execute_search_statement(session, orig_statement):
151 """
152 Does the right search filtering, limiting, and ordering for the initial statement
153 """
154 return session.execute(
155 orig_statement.where(or_(tsv.op("@@")(tsq), sim > TRI_SIMILARITY_THRESHOLD))
156 .where(rank <= next_rank if next_rank is not None else True)
157 .order_by(rank.desc())
158 .limit(page_size + 1)
159 ).all()
161 else:
162 title = _build_doc(A)
164 # trigram based text similarity between title and sql statement string
165 sim = _similarity(statement, title)
167 # ranking algo, weigh the similarity a lot, the text-based ranking less
168 rank = sim.label("rank")
170 # used only for headline
171 tsq = func.websearch_to_tsquery(REGCONFIG, statement)
172 doc = _build_doc(A, B, C, D)
174 # the snippet with results highlighted
175 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet")
177 def execute_search_statement(session, orig_statement):
178 """
179 Does the right search filtering, limiting, and ordering for the initial statement
180 """
181 return session.execute(
182 orig_statement.where(sim > TRI_SIMILARITY_THRESHOLD)
183 .where(rank <= next_rank if next_rank is not None else True)
184 .order_by(rank.desc())
185 .limit(page_size + 1)
186 ).all()
188 return rank, snippet, execute_search_statement
191def _search_users(session, search_statement, title_only, next_rank, page_size, context, include_users):
192 if not include_users:
193 return []
194 rank, snippet, execute_search_statement = _gen_search_elements(
195 search_statement,
196 title_only,
197 next_rank,
198 page_size,
199 [User.username, User.name],
200 [User.city],
201 [User.about_me],
202 [User.things_i_like, User.about_place, User.additional_information],
203 )
205 users = execute_search_statement(session, select(User, rank, snippet).where_users_visible(context))
207 return [
208 search_pb2.Result(
209 rank=rank,
210 user=user_model_to_pb(page, session, context),
211 snippet=snippet,
212 )
213 for page, rank, snippet in users
214 ]
217def _search_pages(session, search_statement, title_only, next_rank, page_size, context, include_places, include_guides):
218 rank, snippet, execute_search_statement = _gen_search_elements(
219 search_statement,
220 title_only,
221 next_rank,
222 page_size,
223 [PageVersion.title],
224 [PageVersion.address],
225 [],
226 [PageVersion.content],
227 )
228 if not include_places and not include_guides:
229 return []
231 latest_pages = (
232 select(func.max(PageVersion.id).label("id"))
233 .join(Page, Page.id == PageVersion.page_id)
234 .where(
235 or_(
236 (Page.type == PageType.place) if include_places else False,
237 (Page.type == PageType.guide) if include_guides else False,
238 )
239 )
240 .group_by(PageVersion.page_id)
241 .subquery()
242 )
244 pages = execute_search_statement(
245 session,
246 select(Page, rank, snippet)
247 .join(PageVersion, PageVersion.page_id == Page.id)
248 .join(latest_pages, latest_pages.c.id == PageVersion.id),
249 )
251 return [
252 search_pb2.Result(
253 rank=rank,
254 place=page_to_pb(session, page, context) if page.type == PageType.place else None,
255 guide=page_to_pb(session, page, context) if page.type == PageType.guide else None,
256 snippet=snippet,
257 )
258 for page, rank, snippet in pages
259 ]
262def _search_events(session, search_statement, title_only, next_rank, page_size, context):
263 rank, snippet, execute_search_statement = _gen_search_elements(
264 search_statement,
265 title_only,
266 next_rank,
267 page_size,
268 [Event.title],
269 [EventOccurrence.address, EventOccurrence.link],
270 [],
271 [EventOccurrence.content],
272 )
274 occurrences = execute_search_statement(
275 session,
276 select(EventOccurrence, rank, snippet)
277 .join(Event, Event.id == EventOccurrence.event_id)
278 .where(EventOccurrence.end_time >= func.now()),
279 )
281 return [
282 search_pb2.Result(
283 rank=rank,
284 event=event_to_pb(session, occurrence, context),
285 snippet=snippet,
286 )
287 for occurrence, rank, snippet in occurrences
288 ]
291def _search_clusters(
292 session, search_statement, title_only, next_rank, page_size, context, include_communities, include_groups
293):
294 if not include_communities and not include_groups:
295 return []
297 rank, snippet, execute_search_statement = _gen_search_elements(
298 search_statement,
299 title_only,
300 next_rank,
301 page_size,
302 [Cluster.name],
303 [PageVersion.address, PageVersion.title],
304 [Cluster.description],
305 [PageVersion.content],
306 )
308 latest_pages = (
309 select(func.max(PageVersion.id).label("id"))
310 .join(Page, Page.id == PageVersion.page_id)
311 .where(Page.type == PageType.main_page)
312 .group_by(PageVersion.page_id)
313 .subquery()
314 )
316 clusters = execute_search_statement(
317 session,
318 select(Cluster, rank, snippet)
319 .join(Page, Page.owner_cluster_id == Cluster.id)
320 .join(PageVersion, PageVersion.page_id == Page.id)
321 .join(latest_pages, latest_pages.c.id == PageVersion.id)
322 .where(Cluster.is_official_cluster if include_communities and not include_groups else True)
323 .where(~Cluster.is_official_cluster if not include_communities and include_groups else True),
324 )
326 return [
327 search_pb2.Result(
328 rank=rank,
329 community=(
330 community_to_pb(session, cluster.official_cluster_for_node, context)
331 if cluster.is_official_cluster
332 else None
333 ),
334 group=group_to_pb(session, cluster, context) if not cluster.is_official_cluster else None,
335 snippet=snippet,
336 )
337 for cluster, rank, snippet in clusters
338 ]
341class Search(search_pb2_grpc.SearchServicer):
342 def Search(self, request, context, session):
343 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
344 # this is not an ideal page token, some results have equal rank (unlikely)
345 next_rank = float(request.page_token) if request.page_token else None
347 all_results = (
348 _search_users(
349 session,
350 request.query,
351 request.title_only,
352 next_rank,
353 page_size,
354 context,
355 request.include_users,
356 )
357 + _search_pages(
358 session,
359 request.query,
360 request.title_only,
361 next_rank,
362 page_size,
363 context,
364 request.include_places,
365 request.include_guides,
366 )
367 + _search_events(
368 session,
369 request.query,
370 request.title_only,
371 next_rank,
372 page_size,
373 context,
374 )
375 + _search_clusters(
376 session,
377 request.query,
378 request.title_only,
379 next_rank,
380 page_size,
381 context,
382 request.include_communities,
383 request.include_groups,
384 )
385 )
386 all_results.sort(key=lambda result: result.rank, reverse=True)
387 return search_pb2.SearchRes(
388 results=all_results[:page_size],
389 next_page_token=str(all_results[page_size].rank) if len(all_results) > page_size else None,
390 )
392 def UserSearch(self, request, context, session):
393 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
395 statement = select(User).where_users_visible(context)
396 if request.HasField("query"):
397 if request.query_name_only:
398 statement = statement.where(
399 or_(User.name.ilike(f"%{request.query.value}%"), User.username.ilike(f"%{request.query.value}%"))
400 )
401 else:
402 statement = statement.where(
403 or_(
404 User.name.ilike(f"%{request.query.value}%"),
405 User.username.ilike(f"%{request.query.value}%"),
406 User.city.ilike(f"%{request.query.value}%"),
407 User.hometown.ilike(f"%{request.query.value}%"),
408 User.about_me.ilike(f"%{request.query.value}%"),
409 User.things_i_like.ilike(f"%{request.query.value}%"),
410 User.about_place.ilike(f"%{request.query.value}%"),
411 User.additional_information.ilike(f"%{request.query.value}%"),
412 )
413 )
415 if request.HasField("last_active"):
416 raw_dt = to_aware_datetime(request.last_active)
417 statement = statement.where(User.last_active >= last_active_coarsen(raw_dt))
419 if len(request.gender) > 0:
420 if not has_strong_verification(session, user):
421 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NEED_STRONG_VERIFICATION)
422 elif user.gender not in request.gender:
423 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_INCLUDE_OWN_GENDER)
424 else:
425 statement = statement.where(User.gender.in_(request.gender))
427 if len(request.hosting_status_filter) > 0:
428 statement = statement.where(
429 User.hosting_status.in_([hostingstatus2sql[status] for status in request.hosting_status_filter])
430 )
431 if len(request.meetup_status_filter) > 0:
432 statement = statement.where(
433 User.meetup_status.in_([meetupstatus2sql[status] for status in request.meetup_status_filter])
434 )
435 if len(request.smoking_location_filter) > 0:
436 statement = statement.where(
437 User.smoking_allowed.in_([smokinglocation2sql[loc] for loc in request.smoking_location_filter])
438 )
439 if len(request.sleeping_arrangement_filter) > 0:
440 statement = statement.where(
441 User.sleeping_arrangement.in_(
442 [sleepingarrangement2sql[arr] for arr in request.sleeping_arrangement_filter]
443 )
444 )
445 if len(request.parking_details_filter) > 0:
446 statement = statement.where(
447 User.parking_details.in_([parkingdetails2sql[det] for det in request.parking_details_filter])
448 )
449 # limits/default could be handled on the front end as well
450 min_age = request.age_min.value if request.HasField("age_min") else 18
451 max_age = request.age_max.value if request.HasField("age_max") else 200
453 statement = statement.where((User.age >= min_age) & (User.age <= max_age))
455 # return results with by language code as only input
456 # fluency in conversational or fluent
458 if len(request.language_ability_filter) > 0:
459 language_options = []
460 for ability_filter in request.language_ability_filter:
461 fluency_sql_value = fluency2sql.get(ability_filter.fluency)
463 if fluency_sql_value is None:
464 continue
465 language_options.append(
466 and_(
467 (LanguageAbility.language_code == ability_filter.code),
468 (LanguageAbility.fluency >= (fluency_sql_value)),
469 )
470 )
471 statement = statement.join(LanguageAbility, LanguageAbility.user_id == User.id)
472 statement = statement.where(or_(*language_options))
474 if request.HasField("profile_completed"):
475 statement = statement.where(User.has_completed_profile == request.profile_completed.value)
476 if request.HasField("guests"):
477 statement = statement.where(User.max_guests >= request.guests.value)
478 if request.HasField("last_minute"):
479 statement = statement.where(User.last_minute == request.last_minute.value)
480 if request.HasField("has_pets"):
481 statement = statement.where(User.has_pets == request.has_pets.value)
482 if request.HasField("accepts_pets"):
483 statement = statement.where(User.accepts_pets == request.accepts_pets.value)
484 if request.HasField("has_kids"):
485 statement = statement.where(User.has_kids == request.has_kids.value)
486 if request.HasField("accepts_kids"):
487 statement = statement.where(User.accepts_kids == request.accepts_kids.value)
488 if request.HasField("has_housemates"):
489 statement = statement.where(User.has_housemates == request.has_housemates.value)
490 if request.HasField("wheelchair_accessible"):
491 statement = statement.where(User.wheelchair_accessible == request.wheelchair_accessible.value)
492 if request.HasField("smoking_allowed"):
493 statement = statement.where(User.smoking_allowed == request.smoking_allowed.value)
494 if request.HasField("smokes_at_home"):
495 statement = statement.where(User.smokes_at_home == request.smokes_at_home.value)
496 if request.HasField("drinking_allowed"):
497 statement = statement.where(User.drinking_allowed == request.drinking_allowed.value)
498 if request.HasField("drinks_at_home"):
499 statement = statement.where(User.drinks_at_home == request.drinks_at_home.value)
500 if request.HasField("parking"):
501 statement = statement.where(User.parking == request.parking.value)
502 if request.HasField("camping_ok"):
503 statement = statement.where(User.camping_ok == request.camping_ok.value)
505 if request.HasField("search_in_area"):
506 # EPSG4326 measures distance in decimal degress
507 # we want to check whether two circles overlap, so check if the distance between their centers is less
508 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator)
509 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng)
510 statement = statement.where(
511 func.ST_DWithin(
512 # old:
513 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111
514 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius
515 User.geom,
516 search_point,
517 (1000 + request.search_in_area.radius) / 111111,
518 )
519 )
520 if request.HasField("search_in_rectangle"):
521 statement = statement.where(
522 func.ST_Within(
523 User.geom,
524 func.ST_MakeEnvelope(
525 request.search_in_rectangle.lng_min,
526 request.search_in_rectangle.lat_min,
527 request.search_in_rectangle.lng_max,
528 request.search_in_rectangle.lat_max,
529 4326,
530 ),
531 )
532 )
533 if request.HasField("search_in_community_id"):
534 # could do a join here as well, but this is just simpler
535 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none()
536 if not node:
537 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
538 statement = statement.where(func.ST_Contains(node.geom, User.geom))
540 if request.only_with_references:
541 references = (
542 select(Reference.to_user_id.label("user_id"))
543 .where_users_column_visible(context, Reference.from_user_id)
544 .distinct()
545 .subquery()
546 )
547 statement = statement.join(references, references.c.user_id == User.id)
549 if request.only_with_strong_verification:
550 statement = statement.join(
551 StrongVerificationAttempt,
552 and_(
553 StrongVerificationAttempt.user_id == User.id,
554 StrongVerificationAttempt.has_strong_verification(User),
555 ),
556 )
557 # TODO:
558 # bool friends_only = 13;
560 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
561 next_recommendation_score = float(decrypt_page_token(request.page_token)) if request.page_token else 1e10
562 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar()
564 statement = (
565 statement.where(User.recommendation_score <= next_recommendation_score)
566 .order_by(User.recommendation_score.desc())
567 .limit(page_size + 1)
568 )
569 users = session.execute(statement).scalars().all()
571 return search_pb2.UserSearchRes(
572 results=[
573 search_pb2.Result(
574 rank=1,
575 user=user_model_to_pb(user, session, context),
576 )
577 for user in users[:page_size]
578 ],
579 next_page_token=(
580 encrypt_page_token(str(users[-1].recommendation_score)) if len(users) > page_size else None
581 ),
582 total_items=total_items,
583 )
585 def EventSearch(self, request, context, session):
586 statement = (
587 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted)
588 )
590 if request.HasField("query"):
591 if request.query_title_only:
592 statement = statement.where(Event.title.ilike(f"%{request.query.value}%"))
593 else:
594 statement = statement.where(
595 or_(
596 Event.title.ilike(f"%{request.query.value}%"),
597 EventOccurrence.content.ilike(f"%{request.query.value}%"),
598 EventOccurrence.address.ilike(f"%{request.query.value}%"),
599 )
600 )
602 if request.only_online:
603 statement = statement.where(EventOccurrence.geom == None)
604 elif request.only_offline:
605 statement = statement.where(EventOccurrence.geom != None)
607 if request.subscribed or request.attending or request.organizing or request.my_communities:
608 where_ = []
610 if request.subscribed:
611 statement = statement.outerjoin(
612 EventSubscription,
613 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id),
614 )
615 where_.append(EventSubscription.user_id != None)
616 if request.organizing:
617 statement = statement.outerjoin(
618 EventOrganizer,
619 and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id),
620 )
621 where_.append(EventOrganizer.user_id != None)
622 if request.attending:
623 statement = statement.outerjoin(
624 EventOccurrenceAttendee,
625 and_(
626 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id,
627 EventOccurrenceAttendee.user_id == context.user_id,
628 ),
629 )
630 where_.append(EventOccurrenceAttendee.user_id != None)
631 if request.my_communities:
632 my_communities = (
633 session.execute(
634 select(Node.id)
635 .join(Cluster, Cluster.parent_node_id == Node.id)
636 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
637 .where(ClusterSubscription.user_id == context.user_id)
638 .where(Cluster.is_official_cluster)
639 .order_by(Node.id)
640 .limit(100000)
641 )
642 .scalars()
643 .all()
644 )
645 where_.append(Event.parent_node_id.in_(my_communities))
647 statement = statement.where(or_(*where_))
649 if not request.include_cancelled:
650 statement = statement.where(~EventOccurrence.is_cancelled)
652 if request.HasField("search_in_area"):
653 # EPSG4326 measures distance in decimal degress
654 # we want to check whether two circles overlap, so check if the distance between their centers is less
655 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator)
656 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng)
657 statement = statement.where(
658 func.ST_DWithin(
659 # old:
660 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111
661 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius
662 EventOccurrence.geom,
663 search_point,
664 (1000 + request.search_in_area.radius) / 111111,
665 )
666 )
667 if request.HasField("search_in_rectangle"):
668 statement = statement.where(
669 func.ST_Within(
670 EventOccurrence.geom,
671 func.ST_MakeEnvelope(
672 request.search_in_rectangle.lng_min,
673 request.search_in_rectangle.lat_min,
674 request.search_in_rectangle.lng_max,
675 request.search_in_rectangle.lat_max,
676 4326,
677 ),
678 )
679 )
680 if request.HasField("search_in_community_id"):
681 # could do a join here as well, but this is just simpler
682 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none()
683 if not node:
684 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
685 statement = statement.where(func.ST_Contains(node.geom, EventOccurrence.geom))
687 if request.HasField("after"):
688 statement = statement.where(EventOccurrence.start_time > to_aware_datetime(request.after))
689 if request.HasField("before"):
690 statement = statement.where(EventOccurrence.end_time < to_aware_datetime(request.before))
692 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
693 # the page token is a unix timestamp of where we left off
694 page_token = (
695 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now()
696 )
697 page_number = request.page_number or 1
698 # Calculate the offset for pagination
699 offset = (page_number - 1) * page_size
701 if not request.past:
702 statement = statement.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
703 EventOccurrence.start_time.asc()
704 )
705 else:
706 statement = statement.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
707 EventOccurrence.start_time.desc()
708 )
710 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar()
711 # Apply pagination by page number
712 statement = statement.offset(offset).limit(page_size) if request.page_number else statement.limit(page_size + 1)
713 occurrences = session.execute(statement).scalars().all()
715 return search_pb2.EventSearchRes(
716 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
717 next_page_token=(str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None),
718 total_items=total_items,
719 )