Coverage for src/couchers/servicers/search.py: 85%
271 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
1"""
2See //docs/search.md for overview.
3"""
5from datetime import timedelta
7import grpc
8from sqlalchemy.sql import and_, func, or_
10from couchers import errors, urls
11from couchers.crypto import decrypt_page_token, encrypt_page_token
12from couchers.helpers.strong_verification import has_strong_verification
13from couchers.materialized_views import LiteUser, UserResponseRate
14from couchers.models import (
15 Cluster,
16 ClusterSubscription,
17 Event,
18 EventOccurrence,
19 EventOccurrenceAttendee,
20 EventOrganizer,
21 EventSubscription,
22 LanguageAbility,
23 Node,
24 Page,
25 PageType,
26 PageVersion,
27 Reference,
28 StrongVerificationAttempt,
29 User,
30)
31from couchers.reranker import reranker
32from couchers.servicers.api import (
33 fluency2sql,
34 get_num_references,
35 hostingstatus2api,
36 hostingstatus2sql,
37 meetupstatus2api,
38 meetupstatus2sql,
39 parkingdetails2sql,
40 response_rate_to_pb,
41 sleepingarrangement2sql,
42 smokinglocation2sql,
43 user_model_to_pb,
44)
45from couchers.servicers.communities import community_to_pb
46from couchers.servicers.events import event_to_pb
47from couchers.servicers.groups import group_to_pb
48from couchers.servicers.pages import page_to_pb
49from couchers.sql import couchers_select as select
50from couchers.utils import (
51 Timestamp_from_datetime,
52 create_coordinate,
53 dt_from_millis,
54 get_coordinates,
55 last_active_coarsen,
56 millis_from_dt,
57 now,
58 to_aware_datetime,
59)
60from proto import search_pb2, search_pb2_grpc
62# searches are a bit expensive, we'd rather send back a bunch of results at once than lots of small pages
63MAX_PAGINATION_LENGTH = 100
65REGCONFIG = "english"
66TRI_SIMILARITY_THRESHOLD = 0.6
67TRI_SIMILARITY_WEIGHT = 5
70def _join_with_space(coalesces):
71 # the objects in coalesces are not strings, so we can't do " ".join(coalesces). They're SQLAlchemy magic.
72 if not coalesces:
73 return ""
74 out = coalesces[0]
75 for coalesce in coalesces[1:]:
76 out += " " + coalesce
77 return out
80def _build_tsv(A, B=None, C=None, D=None):
81 """
82 Given lists for A, B, C, and D, builds a tsvector from them.
83 """
84 B = B or []
85 C = C or []
86 D = D or []
87 tsv = func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in A])), "A")
88 if B:
89 tsv = tsv.concat(
90 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in B])), "B")
91 )
92 if C:
93 tsv = tsv.concat(
94 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in C])), "C")
95 )
96 if D:
97 tsv = tsv.concat(
98 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in D])), "D")
99 )
100 return tsv
103def _build_doc(A, B=None, C=None, D=None):
104 """
105 Builds the raw document (without to_tsvector and weighting), used for extracting snippet
106 """
107 B = B or []
108 C = C or []
109 D = D or []
110 doc = _join_with_space([func.coalesce(bit, "") for bit in A])
111 if B:
112 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in B])
113 if C:
114 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in C])
115 if D:
116 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in D])
117 return doc
120def _similarity(statement, text):
121 return func.word_similarity(func.unaccent(statement), func.unaccent(text))
124def _gen_search_elements(statement, title_only, next_rank, page_size, A, B=None, C=None, D=None):
125 """
126 Given an sql statement and four sets of fields, (A, B, C, D), generates a bunch of postgres expressions for full text search.
128 The four sets are in decreasing order of "importance" for ranking.
130 A should be the "title", the others can be anything.
132 If title_only=True, we only perform a trigram search against A only
133 """
134 B = B or []
135 C = C or []
136 D = D or []
137 if not title_only:
138 # a postgres tsquery object that can be used to match against a tsvector
139 tsq = func.websearch_to_tsquery(REGCONFIG, statement)
141 # the tsvector object that we want to search against with our tsquery
142 tsv = _build_tsv(A, B, C, D)
144 # document to generate snippet from
145 doc = _build_doc(A, B, C, D)
147 title = _build_doc(A)
149 # trigram based text similarity between title and sql statement string
150 sim = _similarity(statement, title)
152 # ranking algo, weigh the similarity a lot, the text-based ranking less
153 rank = (TRI_SIMILARITY_WEIGHT * sim + func.ts_rank_cd(tsv, tsq)).label("rank")
155 # the snippet with results highlighted
156 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet")
158 def execute_search_statement(session, orig_statement):
159 """
160 Does the right search filtering, limiting, and ordering for the initial statement
161 """
162 return session.execute(
163 orig_statement.where(or_(tsv.op("@@")(tsq), sim > TRI_SIMILARITY_THRESHOLD))
164 .where(rank <= next_rank if next_rank is not None else True)
165 .order_by(rank.desc())
166 .limit(page_size + 1)
167 ).all()
169 else:
170 title = _build_doc(A)
172 # trigram based text similarity between title and sql statement string
173 sim = _similarity(statement, title)
175 # ranking algo, weigh the similarity a lot, the text-based ranking less
176 rank = sim.label("rank")
178 # used only for headline
179 tsq = func.websearch_to_tsquery(REGCONFIG, statement)
180 doc = _build_doc(A, B, C, D)
182 # the snippet with results highlighted
183 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet")
185 def execute_search_statement(session, orig_statement):
186 """
187 Does the right search filtering, limiting, and ordering for the initial statement
188 """
189 return session.execute(
190 orig_statement.where(sim > TRI_SIMILARITY_THRESHOLD)
191 .where(rank <= next_rank if next_rank is not None else True)
192 .order_by(rank.desc())
193 .limit(page_size + 1)
194 ).all()
196 return rank, snippet, execute_search_statement
199def _search_users(session, search_statement, title_only, next_rank, page_size, context, include_users):
200 if not include_users:
201 return []
202 rank, snippet, execute_search_statement = _gen_search_elements(
203 search_statement,
204 title_only,
205 next_rank,
206 page_size,
207 [User.username, User.name],
208 [User.city],
209 [User.about_me],
210 [User.things_i_like, User.about_place, User.additional_information],
211 )
213 users = execute_search_statement(session, select(User, rank, snippet).where_users_visible(context))
215 return [
216 search_pb2.Result(
217 rank=rank,
218 user=user_model_to_pb(page, session, context),
219 snippet=snippet,
220 )
221 for page, rank, snippet in users
222 ]
225def _search_pages(session, search_statement, title_only, next_rank, page_size, context, include_places, include_guides):
226 rank, snippet, execute_search_statement = _gen_search_elements(
227 search_statement,
228 title_only,
229 next_rank,
230 page_size,
231 [PageVersion.title],
232 [PageVersion.address],
233 [],
234 [PageVersion.content],
235 )
236 if not include_places and not include_guides:
237 return []
239 latest_pages = (
240 select(func.max(PageVersion.id).label("id"))
241 .join(Page, Page.id == PageVersion.page_id)
242 .where(
243 or_(
244 (Page.type == PageType.place) if include_places else False,
245 (Page.type == PageType.guide) if include_guides else False,
246 )
247 )
248 .group_by(PageVersion.page_id)
249 .subquery()
250 )
252 pages = execute_search_statement(
253 session,
254 select(Page, rank, snippet)
255 .join(PageVersion, PageVersion.page_id == Page.id)
256 .join(latest_pages, latest_pages.c.id == PageVersion.id),
257 )
259 return [
260 search_pb2.Result(
261 rank=rank,
262 place=page_to_pb(session, page, context) if page.type == PageType.place else None,
263 guide=page_to_pb(session, page, context) if page.type == PageType.guide else None,
264 snippet=snippet,
265 )
266 for page, rank, snippet in pages
267 ]
270def _search_events(session, search_statement, title_only, next_rank, page_size, context):
271 rank, snippet, execute_search_statement = _gen_search_elements(
272 search_statement,
273 title_only,
274 next_rank,
275 page_size,
276 [Event.title],
277 [EventOccurrence.address, EventOccurrence.link],
278 [],
279 [EventOccurrence.content],
280 )
282 occurrences = execute_search_statement(
283 session,
284 select(EventOccurrence, rank, snippet)
285 .join(Event, Event.id == EventOccurrence.event_id)
286 .where(EventOccurrence.end_time >= func.now()),
287 )
289 return [
290 search_pb2.Result(
291 rank=rank,
292 event=event_to_pb(session, occurrence, context),
293 snippet=snippet,
294 )
295 for occurrence, rank, snippet in occurrences
296 ]
299def _search_clusters(
300 session, search_statement, title_only, next_rank, page_size, context, include_communities, include_groups
301):
302 if not include_communities and not include_groups:
303 return []
305 rank, snippet, execute_search_statement = _gen_search_elements(
306 search_statement,
307 title_only,
308 next_rank,
309 page_size,
310 [Cluster.name],
311 [PageVersion.address, PageVersion.title],
312 [Cluster.description],
313 [PageVersion.content],
314 )
316 latest_pages = (
317 select(func.max(PageVersion.id).label("id"))
318 .join(Page, Page.id == PageVersion.page_id)
319 .where(Page.type == PageType.main_page)
320 .group_by(PageVersion.page_id)
321 .subquery()
322 )
324 clusters = execute_search_statement(
325 session,
326 select(Cluster, rank, snippet)
327 .join(Page, Page.owner_cluster_id == Cluster.id)
328 .join(PageVersion, PageVersion.page_id == Page.id)
329 .join(latest_pages, latest_pages.c.id == PageVersion.id)
330 .where(Cluster.is_official_cluster if include_communities and not include_groups else True)
331 .where(~Cluster.is_official_cluster if not include_communities and include_groups else True),
332 )
334 return [
335 search_pb2.Result(
336 rank=rank,
337 community=(
338 community_to_pb(session, cluster.official_cluster_for_node, context)
339 if cluster.is_official_cluster
340 else None
341 ),
342 group=group_to_pb(session, cluster, context) if not cluster.is_official_cluster else None,
343 snippet=snippet,
344 )
345 for cluster, rank, snippet in clusters
346 ]
349def _user_search_inner(request, context, session):
350 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
352 # Base statement with visibility filter
353 statement = select(User.id, User.recommendation_score).where_users_visible(context)
354 # make sure that only users who are in LiteUser show up
355 statement = statement.join(LiteUser, LiteUser.id == User.id)
357 # If exactly_user_ids is present, only filter by those IDs and ignore all other filters
358 # This is a bit of a hacky feature to help with the frontend map implementation
359 if len(request.exactly_user_ids) > 0:
360 statement = statement.where(User.id.in_(request.exactly_user_ids))
361 else:
362 # Apply all the normal filters
363 if request.HasField("query"):
364 if request.query_name_only:
365 statement = statement.where(
366 or_(User.name.ilike(f"%{request.query.value}%"), User.username.ilike(f"%{request.query.value}%"))
367 )
368 else:
369 statement = statement.where(
370 or_(
371 User.name.ilike(f"%{request.query.value}%"),
372 User.username.ilike(f"%{request.query.value}%"),
373 User.city.ilike(f"%{request.query.value}%"),
374 User.hometown.ilike(f"%{request.query.value}%"),
375 User.about_me.ilike(f"%{request.query.value}%"),
376 User.things_i_like.ilike(f"%{request.query.value}%"),
377 User.about_place.ilike(f"%{request.query.value}%"),
378 User.additional_information.ilike(f"%{request.query.value}%"),
379 )
380 )
382 if request.HasField("last_active"):
383 raw_dt = to_aware_datetime(request.last_active)
384 statement = statement.where(User.last_active >= last_active_coarsen(raw_dt))
386 if len(request.gender) > 0:
387 if not has_strong_verification(session, user):
388 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NEED_STRONG_VERIFICATION)
389 elif user.gender not in request.gender:
390 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_INCLUDE_OWN_GENDER)
391 else:
392 statement = statement.where(User.gender.in_(request.gender))
394 if len(request.hosting_status_filter) > 0:
395 statement = statement.where(
396 User.hosting_status.in_([hostingstatus2sql[status] for status in request.hosting_status_filter])
397 )
398 if len(request.meetup_status_filter) > 0:
399 statement = statement.where(
400 User.meetup_status.in_([meetupstatus2sql[status] for status in request.meetup_status_filter])
401 )
402 if len(request.smoking_location_filter) > 0:
403 statement = statement.where(
404 User.smoking_allowed.in_([smokinglocation2sql[loc] for loc in request.smoking_location_filter])
405 )
406 if len(request.sleeping_arrangement_filter) > 0:
407 statement = statement.where(
408 User.sleeping_arrangement.in_(
409 [sleepingarrangement2sql[arr] for arr in request.sleeping_arrangement_filter]
410 )
411 )
412 if len(request.parking_details_filter) > 0:
413 statement = statement.where(
414 User.parking_details.in_([parkingdetails2sql[det] for det in request.parking_details_filter])
415 )
416 # limits/default could be handled on the front end as well
417 min_age = request.age_min.value if request.HasField("age_min") else 18
418 max_age = request.age_max.value if request.HasField("age_max") else 200
420 statement = statement.where((User.age >= min_age) & (User.age <= max_age))
422 # return results with by language code as only input
423 # fluency in conversational or fluent
425 if len(request.language_ability_filter) > 0:
426 language_options = []
427 for ability_filter in request.language_ability_filter:
428 fluency_sql_value = fluency2sql.get(ability_filter.fluency)
430 if fluency_sql_value is None:
431 continue
432 language_options.append(
433 and_(
434 (LanguageAbility.language_code == ability_filter.code),
435 (LanguageAbility.fluency >= (fluency_sql_value)),
436 )
437 )
438 statement = statement.join(LanguageAbility, LanguageAbility.user_id == User.id)
439 statement = statement.where(or_(*language_options))
441 if request.HasField("profile_completed"):
442 statement = statement.where(User.has_completed_profile == request.profile_completed.value)
443 if request.HasField("guests"):
444 statement = statement.where(User.max_guests >= request.guests.value)
445 if request.HasField("last_minute"):
446 statement = statement.where(User.last_minute == request.last_minute.value)
447 if request.HasField("has_pets"):
448 statement = statement.where(User.has_pets == request.has_pets.value)
449 if request.HasField("accepts_pets"):
450 statement = statement.where(User.accepts_pets == request.accepts_pets.value)
451 if request.HasField("has_kids"):
452 statement = statement.where(User.has_kids == request.has_kids.value)
453 if request.HasField("accepts_kids"):
454 statement = statement.where(User.accepts_kids == request.accepts_kids.value)
455 if request.HasField("has_housemates"):
456 statement = statement.where(User.has_housemates == request.has_housemates.value)
457 if request.HasField("wheelchair_accessible"):
458 statement = statement.where(User.wheelchair_accessible == request.wheelchair_accessible.value)
459 if request.HasField("smokes_at_home"):
460 statement = statement.where(User.smokes_at_home == request.smokes_at_home.value)
461 if request.HasField("drinking_allowed"):
462 statement = statement.where(User.drinking_allowed == request.drinking_allowed.value)
463 if request.HasField("drinks_at_home"):
464 statement = statement.where(User.drinks_at_home == request.drinks_at_home.value)
465 if request.HasField("parking"):
466 statement = statement.where(User.parking == request.parking.value)
467 if request.HasField("camping_ok"):
468 statement = statement.where(User.camping_ok == request.camping_ok.value)
470 if request.HasField("search_in_area"):
471 # EPSG4326 measures distance in decimal degress
472 # we want to check whether two circles overlap, so check if the distance between their centers is less
473 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator)
474 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng)
475 statement = statement.where(
476 func.ST_DWithin(
477 # old:
478 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111
479 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius
480 User.geom,
481 search_point,
482 (1000 + request.search_in_area.radius) / 111111,
483 )
484 )
485 if request.HasField("search_in_rectangle"):
486 statement = statement.where(
487 func.ST_Within(
488 User.geom,
489 func.ST_MakeEnvelope(
490 request.search_in_rectangle.lng_min,
491 request.search_in_rectangle.lat_min,
492 request.search_in_rectangle.lng_max,
493 request.search_in_rectangle.lat_max,
494 4326,
495 ),
496 )
497 )
498 if request.HasField("search_in_community_id"):
499 # could do a join here as well, but this is just simpler
500 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none()
501 if not node:
502 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
503 statement = statement.where(func.ST_Contains(node.geom, User.geom))
505 if request.only_with_references:
506 references = (
507 select(Reference.to_user_id.label("user_id"))
508 .where_users_column_visible(context, Reference.from_user_id)
509 .distinct()
510 .subquery()
511 )
512 statement = statement.join(references, references.c.user_id == User.id)
514 if request.only_with_strong_verification:
515 statement = statement.join(
516 StrongVerificationAttempt,
517 and_(
518 StrongVerificationAttempt.user_id == User.id,
519 StrongVerificationAttempt.has_strong_verification(User),
520 ),
521 )
522 # TODO:
523 # bool friends_only = 13;
525 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
526 next_recommendation_score = float(decrypt_page_token(request.page_token)) if request.page_token else 1e10
527 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar()
529 statement = (
530 statement.where(User.recommendation_score <= next_recommendation_score)
531 .order_by(User.recommendation_score.desc())
532 .limit(page_size + 1)
533 )
534 res = session.execute(statement).all()
535 if res:
536 users, rec_scores = zip(*res)
537 else:
538 users = []
539 next_page_token = encrypt_page_token(str(rec_scores[-1])) if len(users) > page_size else None
540 return users[:page_size], next_page_token, total_items
543class Search(search_pb2_grpc.SearchServicer):
544 def Search(self, request, context, session):
545 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
546 # this is not an ideal page token, some results have equal rank (unlikely)
547 next_rank = float(request.page_token) if request.page_token else None
549 all_results = (
550 _search_users(
551 session,
552 request.query,
553 request.title_only,
554 next_rank,
555 page_size,
556 context,
557 request.include_users,
558 )
559 + _search_pages(
560 session,
561 request.query,
562 request.title_only,
563 next_rank,
564 page_size,
565 context,
566 request.include_places,
567 request.include_guides,
568 )
569 + _search_events(
570 session,
571 request.query,
572 request.title_only,
573 next_rank,
574 page_size,
575 context,
576 )
577 + _search_clusters(
578 session,
579 request.query,
580 request.title_only,
581 next_rank,
582 page_size,
583 context,
584 request.include_communities,
585 request.include_groups,
586 )
587 )
588 all_results.sort(key=lambda result: result.rank, reverse=True)
589 return search_pb2.SearchRes(
590 results=all_results[:page_size],
591 next_page_token=str(all_results[page_size].rank) if len(all_results) > page_size else None,
592 )
594 def UserSearch(self, request, context, session):
595 user_ids_to_return, next_page_token, total_items = _user_search_inner(request, context, session)
597 user_ids_to_users = dict(session.execute(select(User.id, User).where(User.id.in_(user_ids_to_return))).all())
599 return search_pb2.UserSearchRes(
600 results=[
601 search_pb2.Result(
602 rank=1,
603 user=user_model_to_pb(user_ids_to_users[user_id], session, context),
604 )
605 for user_id in user_ids_to_return
606 ],
607 next_page_token=next_page_token,
608 total_items=total_items,
609 )
611 def UserSearchV2(self, request, context, session):
612 user_ids_to_return, next_page_token, total_items = _user_search_inner(request, context, session)
614 LiteUser_by_id = {
615 lite_user.id: lite_user
616 for lite_user in session.execute(select(LiteUser).where(LiteUser.id.in_(user_ids_to_return)))
617 .scalars()
618 .all()
619 }
621 response_rate_by_id = {
622 resp_rate.user_id: resp_rate
623 for resp_rate in session.execute(
624 select(UserResponseRate).where(UserResponseRate.user_id.in_(user_ids_to_return))
625 )
626 .scalars()
627 .all()
628 }
630 db_user_data_by_id = {
631 user_id: (about_me, gender, last_active, hosting_status, meetup_status, joined)
632 for user_id, about_me, gender, last_active, hosting_status, meetup_status, joined in session.execute(
633 select(
634 User.id,
635 User.about_me,
636 User.gender,
637 User.last_active,
638 User.hosting_status,
639 User.meetup_status,
640 User.joined,
641 ).where(User.id.in_(user_ids_to_return))
642 ).all()
643 }
645 ref_counts_by_user_id = get_num_references(session, user_ids_to_return)
647 def _user_to_search_user(user_id):
648 lite_user = LiteUser_by_id[user_id]
650 about_me, gender, last_active, hosting_status, meetup_status, joined = db_user_data_by_id[user_id]
652 lat, lng = get_coordinates(lite_user.geom)
653 return search_pb2.SearchUser(
654 user_id=lite_user.id,
655 username=lite_user.username,
656 name=lite_user.name,
657 city=lite_user.city,
658 joined=Timestamp_from_datetime(last_active_coarsen(joined)),
659 has_completed_profile=lite_user.has_completed_profile,
660 has_completed_my_home=lite_user.has_completed_my_home,
661 lat=lat,
662 lng=lng,
663 profile_snippet=about_me,
664 num_references=ref_counts_by_user_id.get(lite_user.id, 0),
665 gender=gender,
666 age=int(lite_user.age),
667 last_active=Timestamp_from_datetime(last_active_coarsen(last_active)),
668 hosting_status=hostingstatus2api[hosting_status],
669 meetup_status=meetupstatus2api[meetup_status],
670 avatar_url=urls.media_url(filename=lite_user.avatar_filename, size="full")
671 if lite_user.avatar_filename
672 else None,
673 avatar_thumbnail_url=urls.media_url(filename=lite_user.avatar_filename, size="thumbnail")
674 if lite_user.avatar_filename
675 else None,
676 has_strong_verification=lite_user.has_strong_verification,
677 **response_rate_to_pb(response_rate_by_id.get(user_id)),
678 )
680 results = reranker([_user_to_search_user(user_id) for user_id in user_ids_to_return])
682 return search_pb2.UserSearchV2Res(
683 results=results,
684 next_page_token=next_page_token,
685 total_items=total_items,
686 )
688 def EventSearch(self, request, context, session):
689 statement = (
690 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted)
691 )
693 if request.HasField("query"):
694 if request.query_title_only:
695 statement = statement.where(Event.title.ilike(f"%{request.query.value}%"))
696 else:
697 statement = statement.where(
698 or_(
699 Event.title.ilike(f"%{request.query.value}%"),
700 EventOccurrence.content.ilike(f"%{request.query.value}%"),
701 EventOccurrence.address.ilike(f"%{request.query.value}%"),
702 )
703 )
705 if request.only_online:
706 statement = statement.where(EventOccurrence.geom == None)
707 elif request.only_offline:
708 statement = statement.where(EventOccurrence.geom != None)
710 if request.subscribed or request.attending or request.organizing or request.my_communities:
711 where_ = []
713 if request.subscribed:
714 statement = statement.outerjoin(
715 EventSubscription,
716 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id),
717 )
718 where_.append(EventSubscription.user_id != None)
719 if request.organizing:
720 statement = statement.outerjoin(
721 EventOrganizer,
722 and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id),
723 )
724 where_.append(EventOrganizer.user_id != None)
725 if request.attending:
726 statement = statement.outerjoin(
727 EventOccurrenceAttendee,
728 and_(
729 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id,
730 EventOccurrenceAttendee.user_id == context.user_id,
731 ),
732 )
733 where_.append(EventOccurrenceAttendee.user_id != None)
734 if request.my_communities:
735 my_communities = (
736 session.execute(
737 select(Node.id)
738 .join(Cluster, Cluster.parent_node_id == Node.id)
739 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
740 .where(ClusterSubscription.user_id == context.user_id)
741 .where(Cluster.is_official_cluster)
742 .order_by(Node.id)
743 .limit(100000)
744 )
745 .scalars()
746 .all()
747 )
748 where_.append(Event.parent_node_id.in_(my_communities))
750 statement = statement.where(or_(*where_))
752 if not request.include_cancelled:
753 statement = statement.where(~EventOccurrence.is_cancelled)
755 if request.HasField("search_in_area"):
756 # EPSG4326 measures distance in decimal degress
757 # we want to check whether two circles overlap, so check if the distance between their centers is less
758 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator)
759 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng)
760 statement = statement.where(
761 func.ST_DWithin(
762 # old:
763 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111
764 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius
765 EventOccurrence.geom,
766 search_point,
767 (1000 + request.search_in_area.radius) / 111111,
768 )
769 )
770 if request.HasField("search_in_rectangle"):
771 statement = statement.where(
772 func.ST_Within(
773 EventOccurrence.geom,
774 func.ST_MakeEnvelope(
775 request.search_in_rectangle.lng_min,
776 request.search_in_rectangle.lat_min,
777 request.search_in_rectangle.lng_max,
778 request.search_in_rectangle.lat_max,
779 4326,
780 ),
781 )
782 )
783 if request.HasField("search_in_community_id"):
784 # could do a join here as well, but this is just simpler
785 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none()
786 if not node:
787 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
788 statement = statement.where(func.ST_Contains(node.geom, EventOccurrence.geom))
790 if request.HasField("after"):
791 statement = statement.where(EventOccurrence.start_time > to_aware_datetime(request.after))
792 if request.HasField("before"):
793 statement = statement.where(EventOccurrence.end_time < to_aware_datetime(request.before))
795 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
796 # the page token is a unix timestamp of where we left off
797 page_token = (
798 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now()
799 )
800 page_number = request.page_number or 1
801 # Calculate the offset for pagination
802 offset = (page_number - 1) * page_size
804 if not request.past:
805 statement = statement.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
806 EventOccurrence.start_time.asc()
807 )
808 else:
809 statement = statement.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
810 EventOccurrence.start_time.desc()
811 )
813 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar()
814 # Apply pagination by page number
815 statement = statement.offset(offset).limit(page_size) if request.page_number else statement.limit(page_size + 1)
816 occurrences = session.execute(statement).scalars().all()
818 return search_pb2.EventSearchRes(
819 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
820 next_page_token=(str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None),
821 total_items=total_items,
822 )