Coverage for src/couchers/servicers/search.py: 84%
245 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-24 14:08 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-24 14:08 +0000
1"""
2See //docs/search.md for overview.
3"""
5from datetime import timedelta
7import grpc
8from sqlalchemy.sql import and_, func, or_
10from couchers import errors
11from couchers.crypto import decrypt_page_token, encrypt_page_token
12from couchers.models import (
13 Cluster,
14 ClusterSubscription,
15 Event,
16 EventOccurrence,
17 EventOccurrenceAttendee,
18 EventOrganizer,
19 EventSubscription,
20 LanguageAbility,
21 Node,
22 Page,
23 PageType,
24 PageVersion,
25 Reference,
26 StrongVerificationAttempt,
27 User,
28)
29from couchers.servicers.account import has_strong_verification
30from couchers.servicers.api import (
31 fluency2sql,
32 hostingstatus2sql,
33 meetupstatus2sql,
34 parkingdetails2sql,
35 sleepingarrangement2sql,
36 smokinglocation2sql,
37 user_model_to_pb,
38)
39from couchers.servicers.communities import community_to_pb
40from couchers.servicers.events import event_to_pb
41from couchers.servicers.groups import group_to_pb
42from couchers.servicers.pages import page_to_pb
43from couchers.sql import couchers_select as select
44from couchers.utils import (
45 create_coordinate,
46 dt_from_millis,
47 last_active_coarsen,
48 millis_from_dt,
49 now,
50 to_aware_datetime,
51)
52from proto import search_pb2, search_pb2_grpc
54# searches are a bit expensive, we'd rather send back a bunch of results at once than lots of small pages
55MAX_PAGINATION_LENGTH = 100
57REGCONFIG = "english"
58TRI_SIMILARITY_THRESHOLD = 0.6
59TRI_SIMILARITY_WEIGHT = 5
62def _join_with_space(coalesces):
63 # the objects in coalesces are not strings, so we can't do " ".join(coalesces). They're SQLAlchemy magic.
64 if not coalesces:
65 return ""
66 out = coalesces[0]
67 for coalesce in coalesces[1:]:
68 out += " " + coalesce
69 return out
72def _build_tsv(A, B=None, C=None, D=None):
73 """
74 Given lists for A, B, C, and D, builds a tsvector from them.
75 """
76 B = B or []
77 C = C or []
78 D = D or []
79 tsv = func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in A])), "A")
80 if B:
81 tsv = tsv.concat(
82 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in B])), "B")
83 )
84 if C:
85 tsv = tsv.concat(
86 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in C])), "C")
87 )
88 if D:
89 tsv = tsv.concat(
90 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in D])), "D")
91 )
92 return tsv
95def _build_doc(A, B=None, C=None, D=None):
96 """
97 Builds the raw document (without to_tsvector and weighting), used for extracting snippet
98 """
99 B = B or []
100 C = C or []
101 D = D or []
102 doc = _join_with_space([func.coalesce(bit, "") for bit in A])
103 if B:
104 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in B])
105 if C:
106 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in C])
107 if D:
108 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in D])
109 return doc
112def _similarity(statement, text):
113 return func.word_similarity(func.unaccent(statement), func.unaccent(text))
116def _gen_search_elements(statement, title_only, next_rank, page_size, A, B=None, C=None, D=None):
117 """
118 Given an sql statement and four sets of fields, (A, B, C, D), generates a bunch of postgres expressions for full text search.
120 The four sets are in decreasing order of "importance" for ranking.
122 A should be the "title", the others can be anything.
124 If title_only=True, we only perform a trigram search against A only
125 """
126 B = B or []
127 C = C or []
128 D = D or []
129 if not title_only:
130 # a postgres tsquery object that can be used to match against a tsvector
131 tsq = func.websearch_to_tsquery(REGCONFIG, statement)
133 # the tsvector object that we want to search against with our tsquery
134 tsv = _build_tsv(A, B, C, D)
136 # document to generate snippet from
137 doc = _build_doc(A, B, C, D)
139 title = _build_doc(A)
141 # trigram based text similarity between title and sql statement string
142 sim = _similarity(statement, title)
144 # ranking algo, weigh the similarity a lot, the text-based ranking less
145 rank = (TRI_SIMILARITY_WEIGHT * sim + func.ts_rank_cd(tsv, tsq)).label("rank")
147 # the snippet with results highlighted
148 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet")
150 def execute_search_statement(session, orig_statement):
151 """
152 Does the right search filtering, limiting, and ordering for the initial statement
153 """
154 return session.execute(
155 orig_statement.where(or_(tsv.op("@@")(tsq), sim > TRI_SIMILARITY_THRESHOLD))
156 .where(rank <= next_rank if next_rank is not None else True)
157 .order_by(rank.desc())
158 .limit(page_size + 1)
159 ).all()
161 else:
162 title = _build_doc(A)
164 # trigram based text similarity between title and sql statement string
165 sim = _similarity(statement, title)
167 # ranking algo, weigh the similarity a lot, the text-based ranking less
168 rank = sim.label("rank")
170 # used only for headline
171 tsq = func.websearch_to_tsquery(REGCONFIG, statement)
172 doc = _build_doc(A, B, C, D)
174 # the snippet with results highlighted
175 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet")
177 def execute_search_statement(session, orig_statement):
178 """
179 Does the right search filtering, limiting, and ordering for the initial statement
180 """
181 return session.execute(
182 orig_statement.where(sim > TRI_SIMILARITY_THRESHOLD)
183 .where(rank <= next_rank if next_rank is not None else True)
184 .order_by(rank.desc())
185 .limit(page_size + 1)
186 ).all()
188 return rank, snippet, execute_search_statement
191def _search_users(session, search_statement, title_only, next_rank, page_size, context, include_users):
192 if not include_users:
193 return []
194 rank, snippet, execute_search_statement = _gen_search_elements(
195 search_statement,
196 title_only,
197 next_rank,
198 page_size,
199 [User.username, User.name],
200 [User.city],
201 [User.about_me],
202 [User.things_i_like, User.about_place, User.additional_information],
203 )
205 users = execute_search_statement(session, select(User, rank, snippet).where_users_visible(context))
207 return [
208 search_pb2.Result(
209 rank=rank,
210 user=user_model_to_pb(page, session, context),
211 snippet=snippet,
212 )
213 for page, rank, snippet in users
214 ]
217def _search_pages(session, search_statement, title_only, next_rank, page_size, context, include_places, include_guides):
218 rank, snippet, execute_search_statement = _gen_search_elements(
219 search_statement,
220 title_only,
221 next_rank,
222 page_size,
223 [PageVersion.title],
224 [PageVersion.address],
225 [],
226 [PageVersion.content],
227 )
228 if not include_places and not include_guides:
229 return []
231 latest_pages = (
232 select(func.max(PageVersion.id).label("id"))
233 .join(Page, Page.id == PageVersion.page_id)
234 .where(
235 or_(
236 (Page.type == PageType.place) if include_places else False,
237 (Page.type == PageType.guide) if include_guides else False,
238 )
239 )
240 .group_by(PageVersion.page_id)
241 .subquery()
242 )
244 pages = execute_search_statement(
245 session,
246 select(Page, rank, snippet)
247 .join(PageVersion, PageVersion.page_id == Page.id)
248 .join(latest_pages, latest_pages.c.id == PageVersion.id),
249 )
251 return [
252 search_pb2.Result(
253 rank=rank,
254 place=page_to_pb(session, page, context) if page.type == PageType.place else None,
255 guide=page_to_pb(session, page, context) if page.type == PageType.guide else None,
256 snippet=snippet,
257 )
258 for page, rank, snippet in pages
259 ]
262def _search_events(session, search_statement, title_only, next_rank, page_size, context):
263 rank, snippet, execute_search_statement = _gen_search_elements(
264 search_statement,
265 title_only,
266 next_rank,
267 page_size,
268 [Event.title],
269 [EventOccurrence.address, EventOccurrence.link],
270 [],
271 [EventOccurrence.content],
272 )
274 occurrences = execute_search_statement(
275 session,
276 select(EventOccurrence, rank, snippet)
277 .join(Event, Event.id == EventOccurrence.event_id)
278 .where(EventOccurrence.end_time >= func.now()),
279 )
281 return [
282 search_pb2.Result(
283 rank=rank,
284 event=event_to_pb(session, occurrence, context),
285 snippet=snippet,
286 )
287 for occurrence, rank, snippet in occurrences
288 ]
291def _search_clusters(
292 session, search_statement, title_only, next_rank, page_size, context, include_communities, include_groups
293):
294 if not include_communities and not include_groups:
295 return []
297 rank, snippet, execute_search_statement = _gen_search_elements(
298 search_statement,
299 title_only,
300 next_rank,
301 page_size,
302 [Cluster.name],
303 [PageVersion.address, PageVersion.title],
304 [Cluster.description],
305 [PageVersion.content],
306 )
308 latest_pages = (
309 select(func.max(PageVersion.id).label("id"))
310 .join(Page, Page.id == PageVersion.page_id)
311 .where(Page.type == PageType.main_page)
312 .group_by(PageVersion.page_id)
313 .subquery()
314 )
316 clusters = execute_search_statement(
317 session,
318 select(Cluster, rank, snippet)
319 .join(Page, Page.owner_cluster_id == Cluster.id)
320 .join(PageVersion, PageVersion.page_id == Page.id)
321 .join(latest_pages, latest_pages.c.id == PageVersion.id)
322 .where(Cluster.is_official_cluster if include_communities and not include_groups else True)
323 .where(~Cluster.is_official_cluster if not include_communities and include_groups else True),
324 )
326 return [
327 search_pb2.Result(
328 rank=rank,
329 community=(
330 community_to_pb(session, cluster.official_cluster_for_node, context)
331 if cluster.is_official_cluster
332 else None
333 ),
334 group=group_to_pb(session, cluster, context) if not cluster.is_official_cluster else None,
335 snippet=snippet,
336 )
337 for cluster, rank, snippet in clusters
338 ]
341class Search(search_pb2_grpc.SearchServicer):
342 def Search(self, request, context, session):
343 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
344 # this is not an ideal page token, some results have equal rank (unlikely)
345 next_rank = float(request.page_token) if request.page_token else None
347 all_results = (
348 _search_users(
349 session,
350 request.query,
351 request.title_only,
352 next_rank,
353 page_size,
354 context,
355 request.include_users,
356 )
357 + _search_pages(
358 session,
359 request.query,
360 request.title_only,
361 next_rank,
362 page_size,
363 context,
364 request.include_places,
365 request.include_guides,
366 )
367 + _search_events(
368 session,
369 request.query,
370 request.title_only,
371 next_rank,
372 page_size,
373 context,
374 )
375 + _search_clusters(
376 session,
377 request.query,
378 request.title_only,
379 next_rank,
380 page_size,
381 context,
382 request.include_communities,
383 request.include_groups,
384 )
385 )
386 all_results.sort(key=lambda result: result.rank, reverse=True)
387 return search_pb2.SearchRes(
388 results=all_results[:page_size],
389 next_page_token=str(all_results[page_size].rank) if len(all_results) > page_size else None,
390 )
392 def UserSearch(self, request, context, session):
393 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
395 statement = select(User).where_users_visible(context)
396 if request.HasField("query"):
397 if request.query_name_only:
398 statement = statement.where(
399 or_(User.name.ilike(f"%{request.query.value}%"), User.username.ilike(f"%{request.query.value}%"))
400 )
401 else:
402 statement = statement.where(
403 or_(
404 User.name.ilike(f"%{request.query.value}%"),
405 User.username.ilike(f"%{request.query.value}%"),
406 User.city.ilike(f"%{request.query.value}%"),
407 User.hometown.ilike(f"%{request.query.value}%"),
408 User.about_me.ilike(f"%{request.query.value}%"),
409 User.things_i_like.ilike(f"%{request.query.value}%"),
410 User.about_place.ilike(f"%{request.query.value}%"),
411 User.additional_information.ilike(f"%{request.query.value}%"),
412 )
413 )
415 if request.HasField("last_active"):
416 raw_dt = to_aware_datetime(request.last_active)
417 statement = statement.where(User.last_active >= last_active_coarsen(raw_dt))
419 if len(request.gender) > 0:
420 if not has_strong_verification(session, user):
421 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NEED_STRONG_VERIFICATION)
422 elif user.gender not in request.gender:
423 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_INCLUDE_OWN_GENDER)
424 else:
425 statement = statement.where(User.gender.in_(request.gender))
427 if len(request.hosting_status_filter) > 0:
428 statement = statement.where(
429 User.hosting_status.in_([hostingstatus2sql[status] for status in request.hosting_status_filter])
430 )
431 if len(request.meetup_status_filter) > 0:
432 statement = statement.where(
433 User.meetup_status.in_([meetupstatus2sql[status] for status in request.meetup_status_filter])
434 )
435 if len(request.smoking_location_filter) > 0:
436 statement = statement.where(
437 User.smoking_allowed.in_([smokinglocation2sql[loc] for loc in request.smoking_location_filter])
438 )
439 if len(request.sleeping_arrangement_filter) > 0:
440 statement = statement.where(
441 User.sleeping_arrangement.in_(
442 [sleepingarrangement2sql[arr] for arr in request.sleeping_arrangement_filter]
443 )
444 )
445 if len(request.parking_details_filter) > 0:
446 statement = statement.where(
447 User.parking_details.in_([parkingdetails2sql[det] for det in request.parking_details_filter])
448 )
449 # limits/default could be handled on the front end as well
450 min_age = request.age_min.value if request.HasField("age_min") else 18
451 max_age = request.age_max.value if request.HasField("age_max") else 200
453 statement = statement.where((User.age >= min_age) & (User.age <= max_age))
455 # return results with by language code as only input
456 # fluency in conversational or fluent
458 if len(request.language_ability_filter) > 0:
459 language_options = []
460 for ability_filter in request.language_ability_filter:
461 fluency_sql_value = fluency2sql.get(ability_filter.fluency)
463 if fluency_sql_value is None:
464 continue
465 language_options.append(
466 and_(
467 (LanguageAbility.language_code == ability_filter.code),
468 (LanguageAbility.fluency >= (fluency_sql_value)),
469 )
470 )
471 statement = statement.join(LanguageAbility, LanguageAbility.user_id == User.id)
472 statement = statement.where(or_(*language_options))
474 if request.HasField("profile_completed"):
475 statement = statement.where(User.has_completed_profile == request.profile_completed.value)
476 if request.HasField("guests"):
477 statement = statement.where(User.max_guests >= request.guests.value)
478 if request.HasField("last_minute"):
479 statement = statement.where(User.last_minute == request.last_minute.value)
480 if request.HasField("has_pets"):
481 statement = statement.where(User.has_pets == request.has_pets.value)
482 if request.HasField("accepts_pets"):
483 statement = statement.where(User.accepts_pets == request.accepts_pets.value)
484 if request.HasField("has_kids"):
485 statement = statement.where(User.has_kids == request.has_kids.value)
486 if request.HasField("accepts_kids"):
487 statement = statement.where(User.accepts_kids == request.accepts_kids.value)
488 if request.HasField("has_housemates"):
489 statement = statement.where(User.has_housemates == request.has_housemates.value)
490 if request.HasField("wheelchair_accessible"):
491 statement = statement.where(User.wheelchair_accessible == request.wheelchair_accessible.value)
492 if request.HasField("smokes_at_home"):
493 statement = statement.where(User.smokes_at_home == request.smokes_at_home.value)
494 if request.HasField("drinking_allowed"):
495 statement = statement.where(User.drinking_allowed == request.drinking_allowed.value)
496 if request.HasField("drinks_at_home"):
497 statement = statement.where(User.drinks_at_home == request.drinks_at_home.value)
498 if request.HasField("parking"):
499 statement = statement.where(User.parking == request.parking.value)
500 if request.HasField("camping_ok"):
501 statement = statement.where(User.camping_ok == request.camping_ok.value)
503 if request.HasField("search_in_area"):
504 # EPSG4326 measures distance in decimal degress
505 # we want to check whether two circles overlap, so check if the distance between their centers is less
506 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator)
507 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng)
508 statement = statement.where(
509 func.ST_DWithin(
510 # old:
511 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111
512 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius
513 User.geom,
514 search_point,
515 (1000 + request.search_in_area.radius) / 111111,
516 )
517 )
518 if request.HasField("search_in_rectangle"):
519 statement = statement.where(
520 func.ST_Within(
521 User.geom,
522 func.ST_MakeEnvelope(
523 request.search_in_rectangle.lng_min,
524 request.search_in_rectangle.lat_min,
525 request.search_in_rectangle.lng_max,
526 request.search_in_rectangle.lat_max,
527 4326,
528 ),
529 )
530 )
531 if request.HasField("search_in_community_id"):
532 # could do a join here as well, but this is just simpler
533 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none()
534 if not node:
535 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
536 statement = statement.where(func.ST_Contains(node.geom, User.geom))
538 if request.only_with_references:
539 references = (
540 select(Reference.to_user_id.label("user_id"))
541 .where_users_column_visible(context, Reference.from_user_id)
542 .distinct()
543 .subquery()
544 )
545 statement = statement.join(references, references.c.user_id == User.id)
547 if request.only_with_strong_verification:
548 statement = statement.join(
549 StrongVerificationAttempt,
550 and_(
551 StrongVerificationAttempt.user_id == User.id,
552 StrongVerificationAttempt.has_strong_verification(User),
553 ),
554 )
555 # TODO:
556 # bool friends_only = 13;
558 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
559 next_recommendation_score = float(decrypt_page_token(request.page_token)) if request.page_token else 1e10
560 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar()
562 statement = (
563 statement.where(User.recommendation_score <= next_recommendation_score)
564 .order_by(User.recommendation_score.desc())
565 .limit(page_size + 1)
566 )
567 users = session.execute(statement).scalars().all()
569 return search_pb2.UserSearchRes(
570 results=[
571 search_pb2.Result(
572 rank=1,
573 user=user_model_to_pb(user, session, context),
574 )
575 for user in users[:page_size]
576 ],
577 next_page_token=(
578 encrypt_page_token(str(users[-1].recommendation_score)) if len(users) > page_size else None
579 ),
580 total_items=total_items,
581 )
583 def EventSearch(self, request, context, session):
584 statement = (
585 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted)
586 )
588 if request.HasField("query"):
589 if request.query_title_only:
590 statement = statement.where(Event.title.ilike(f"%{request.query.value}%"))
591 else:
592 statement = statement.where(
593 or_(
594 Event.title.ilike(f"%{request.query.value}%"),
595 EventOccurrence.content.ilike(f"%{request.query.value}%"),
596 EventOccurrence.address.ilike(f"%{request.query.value}%"),
597 )
598 )
600 if request.only_online:
601 statement = statement.where(EventOccurrence.geom == None)
602 elif request.only_offline:
603 statement = statement.where(EventOccurrence.geom != None)
605 if request.subscribed or request.attending or request.organizing or request.my_communities:
606 where_ = []
608 if request.subscribed:
609 statement = statement.outerjoin(
610 EventSubscription,
611 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id),
612 )
613 where_.append(EventSubscription.user_id != None)
614 if request.organizing:
615 statement = statement.outerjoin(
616 EventOrganizer,
617 and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id),
618 )
619 where_.append(EventOrganizer.user_id != None)
620 if request.attending:
621 statement = statement.outerjoin(
622 EventOccurrenceAttendee,
623 and_(
624 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id,
625 EventOccurrenceAttendee.user_id == context.user_id,
626 ),
627 )
628 where_.append(EventOccurrenceAttendee.user_id != None)
629 if request.my_communities:
630 my_communities = (
631 session.execute(
632 select(Node.id)
633 .join(Cluster, Cluster.parent_node_id == Node.id)
634 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
635 .where(ClusterSubscription.user_id == context.user_id)
636 .where(Cluster.is_official_cluster)
637 .order_by(Node.id)
638 .limit(100000)
639 )
640 .scalars()
641 .all()
642 )
643 where_.append(Event.parent_node_id.in_(my_communities))
645 statement = statement.where(or_(*where_))
647 if not request.include_cancelled:
648 statement = statement.where(~EventOccurrence.is_cancelled)
650 if request.HasField("search_in_area"):
651 # EPSG4326 measures distance in decimal degress
652 # we want to check whether two circles overlap, so check if the distance between their centers is less
653 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator)
654 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng)
655 statement = statement.where(
656 func.ST_DWithin(
657 # old:
658 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111
659 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius
660 EventOccurrence.geom,
661 search_point,
662 (1000 + request.search_in_area.radius) / 111111,
663 )
664 )
665 if request.HasField("search_in_rectangle"):
666 statement = statement.where(
667 func.ST_Within(
668 EventOccurrence.geom,
669 func.ST_MakeEnvelope(
670 request.search_in_rectangle.lng_min,
671 request.search_in_rectangle.lat_min,
672 request.search_in_rectangle.lng_max,
673 request.search_in_rectangle.lat_max,
674 4326,
675 ),
676 )
677 )
678 if request.HasField("search_in_community_id"):
679 # could do a join here as well, but this is just simpler
680 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none()
681 if not node:
682 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
683 statement = statement.where(func.ST_Contains(node.geom, EventOccurrence.geom))
685 if request.HasField("after"):
686 statement = statement.where(EventOccurrence.start_time > to_aware_datetime(request.after))
687 if request.HasField("before"):
688 statement = statement.where(EventOccurrence.end_time < to_aware_datetime(request.before))
690 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
691 # the page token is a unix timestamp of where we left off
692 page_token = (
693 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now()
694 )
695 page_number = request.page_number or 1
696 # Calculate the offset for pagination
697 offset = (page_number - 1) * page_size
699 if not request.past:
700 statement = statement.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
701 EventOccurrence.start_time.asc()
702 )
703 else:
704 statement = statement.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
705 EventOccurrence.start_time.desc()
706 )
708 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar()
709 # Apply pagination by page number
710 statement = statement.offset(offset).limit(page_size) if request.page_number else statement.limit(page_size + 1)
711 occurrences = session.execute(statement).scalars().all()
713 return search_pb2.EventSearchRes(
714 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
715 next_page_token=(str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None),
716 total_items=total_items,
717 )