Coverage for src/couchers/servicers/search.py: 84%

247 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-04-16 15:13 +0000

1""" 

2See //docs/search.md for overview. 

3""" 

4 

5from datetime import timedelta 

6 

7import grpc 

8from sqlalchemy.sql import and_, func, or_ 

9 

10from couchers import errors 

11from couchers.crypto import decrypt_page_token, encrypt_page_token 

12from couchers.models import ( 

13 Cluster, 

14 ClusterSubscription, 

15 Event, 

16 EventOccurrence, 

17 EventOccurrenceAttendee, 

18 EventOrganizer, 

19 EventSubscription, 

20 LanguageAbility, 

21 Node, 

22 Page, 

23 PageType, 

24 PageVersion, 

25 Reference, 

26 StrongVerificationAttempt, 

27 User, 

28) 

29from couchers.servicers.account import has_strong_verification 

30from couchers.servicers.api import ( 

31 fluency2sql, 

32 hostingstatus2sql, 

33 meetupstatus2sql, 

34 parkingdetails2sql, 

35 sleepingarrangement2sql, 

36 smokinglocation2sql, 

37 user_model_to_pb, 

38) 

39from couchers.servicers.communities import community_to_pb 

40from couchers.servicers.events import event_to_pb 

41from couchers.servicers.groups import group_to_pb 

42from couchers.servicers.pages import page_to_pb 

43from couchers.sql import couchers_select as select 

44from couchers.utils import ( 

45 create_coordinate, 

46 dt_from_millis, 

47 last_active_coarsen, 

48 millis_from_dt, 

49 now, 

50 to_aware_datetime, 

51) 

52from proto import search_pb2, search_pb2_grpc 

53 

54# searches are a bit expensive, we'd rather send back a bunch of results at once than lots of small pages 

55MAX_PAGINATION_LENGTH = 100 

56 

57REGCONFIG = "english" 

58TRI_SIMILARITY_THRESHOLD = 0.6 

59TRI_SIMILARITY_WEIGHT = 5 

60 

61 

62def _join_with_space(coalesces): 

63 # the objects in coalesces are not strings, so we can't do " ".join(coalesces). They're SQLAlchemy magic. 

64 if not coalesces: 

65 return "" 

66 out = coalesces[0] 

67 for coalesce in coalesces[1:]: 

68 out += " " + coalesce 

69 return out 

70 

71 

72def _build_tsv(A, B=None, C=None, D=None): 

73 """ 

74 Given lists for A, B, C, and D, builds a tsvector from them. 

75 """ 

76 B = B or [] 

77 C = C or [] 

78 D = D or [] 

79 tsv = func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in A])), "A") 

80 if B: 

81 tsv = tsv.concat( 

82 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in B])), "B") 

83 ) 

84 if C: 

85 tsv = tsv.concat( 

86 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in C])), "C") 

87 ) 

88 if D: 

89 tsv = tsv.concat( 

90 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in D])), "D") 

91 ) 

92 return tsv 

93 

94 

95def _build_doc(A, B=None, C=None, D=None): 

96 """ 

97 Builds the raw document (without to_tsvector and weighting), used for extracting snippet 

98 """ 

99 B = B or [] 

100 C = C or [] 

101 D = D or [] 

102 doc = _join_with_space([func.coalesce(bit, "") for bit in A]) 

103 if B: 

104 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in B]) 

105 if C: 

106 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in C]) 

107 if D: 

108 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in D]) 

109 return doc 

110 

111 

112def _similarity(statement, text): 

113 return func.word_similarity(func.unaccent(statement), func.unaccent(text)) 

114 

115 

116def _gen_search_elements(statement, title_only, next_rank, page_size, A, B=None, C=None, D=None): 

117 """ 

118 Given an sql statement and four sets of fields, (A, B, C, D), generates a bunch of postgres expressions for full text search. 

119 

120 The four sets are in decreasing order of "importance" for ranking. 

121 

122 A should be the "title", the others can be anything. 

123 

124 If title_only=True, we only perform a trigram search against A only 

125 """ 

126 B = B or [] 

127 C = C or [] 

128 D = D or [] 

129 if not title_only: 

130 # a postgres tsquery object that can be used to match against a tsvector 

131 tsq = func.websearch_to_tsquery(REGCONFIG, statement) 

132 

133 # the tsvector object that we want to search against with our tsquery 

134 tsv = _build_tsv(A, B, C, D) 

135 

136 # document to generate snippet from 

137 doc = _build_doc(A, B, C, D) 

138 

139 title = _build_doc(A) 

140 

141 # trigram based text similarity between title and sql statement string 

142 sim = _similarity(statement, title) 

143 

144 # ranking algo, weigh the similarity a lot, the text-based ranking less 

145 rank = (TRI_SIMILARITY_WEIGHT * sim + func.ts_rank_cd(tsv, tsq)).label("rank") 

146 

147 # the snippet with results highlighted 

148 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet") 

149 

150 def execute_search_statement(session, orig_statement): 

151 """ 

152 Does the right search filtering, limiting, and ordering for the initial statement 

153 """ 

154 return session.execute( 

155 orig_statement.where(or_(tsv.op("@@")(tsq), sim > TRI_SIMILARITY_THRESHOLD)) 

156 .where(rank <= next_rank if next_rank is not None else True) 

157 .order_by(rank.desc()) 

158 .limit(page_size + 1) 

159 ).all() 

160 

161 else: 

162 title = _build_doc(A) 

163 

164 # trigram based text similarity between title and sql statement string 

165 sim = _similarity(statement, title) 

166 

167 # ranking algo, weigh the similarity a lot, the text-based ranking less 

168 rank = sim.label("rank") 

169 

170 # used only for headline 

171 tsq = func.websearch_to_tsquery(REGCONFIG, statement) 

172 doc = _build_doc(A, B, C, D) 

173 

174 # the snippet with results highlighted 

175 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet") 

176 

177 def execute_search_statement(session, orig_statement): 

178 """ 

179 Does the right search filtering, limiting, and ordering for the initial statement 

180 """ 

181 return session.execute( 

182 orig_statement.where(sim > TRI_SIMILARITY_THRESHOLD) 

183 .where(rank <= next_rank if next_rank is not None else True) 

184 .order_by(rank.desc()) 

185 .limit(page_size + 1) 

186 ).all() 

187 

188 return rank, snippet, execute_search_statement 

189 

190 

191def _search_users(session, search_statement, title_only, next_rank, page_size, context, include_users): 

192 if not include_users: 

193 return [] 

194 rank, snippet, execute_search_statement = _gen_search_elements( 

195 search_statement, 

196 title_only, 

197 next_rank, 

198 page_size, 

199 [User.username, User.name], 

200 [User.city], 

201 [User.about_me], 

202 [User.things_i_like, User.about_place, User.additional_information], 

203 ) 

204 

205 users = execute_search_statement(session, select(User, rank, snippet).where_users_visible(context)) 

206 

207 return [ 

208 search_pb2.Result( 

209 rank=rank, 

210 user=user_model_to_pb(page, session, context), 

211 snippet=snippet, 

212 ) 

213 for page, rank, snippet in users 

214 ] 

215 

216 

217def _search_pages(session, search_statement, title_only, next_rank, page_size, context, include_places, include_guides): 

218 rank, snippet, execute_search_statement = _gen_search_elements( 

219 search_statement, 

220 title_only, 

221 next_rank, 

222 page_size, 

223 [PageVersion.title], 

224 [PageVersion.address], 

225 [], 

226 [PageVersion.content], 

227 ) 

228 if not include_places and not include_guides: 

229 return [] 

230 

231 latest_pages = ( 

232 select(func.max(PageVersion.id).label("id")) 

233 .join(Page, Page.id == PageVersion.page_id) 

234 .where( 

235 or_( 

236 (Page.type == PageType.place) if include_places else False, 

237 (Page.type == PageType.guide) if include_guides else False, 

238 ) 

239 ) 

240 .group_by(PageVersion.page_id) 

241 .subquery() 

242 ) 

243 

244 pages = execute_search_statement( 

245 session, 

246 select(Page, rank, snippet) 

247 .join(PageVersion, PageVersion.page_id == Page.id) 

248 .join(latest_pages, latest_pages.c.id == PageVersion.id), 

249 ) 

250 

251 return [ 

252 search_pb2.Result( 

253 rank=rank, 

254 place=page_to_pb(session, page, context) if page.type == PageType.place else None, 

255 guide=page_to_pb(session, page, context) if page.type == PageType.guide else None, 

256 snippet=snippet, 

257 ) 

258 for page, rank, snippet in pages 

259 ] 

260 

261 

262def _search_events(session, search_statement, title_only, next_rank, page_size, context): 

263 rank, snippet, execute_search_statement = _gen_search_elements( 

264 search_statement, 

265 title_only, 

266 next_rank, 

267 page_size, 

268 [Event.title], 

269 [EventOccurrence.address, EventOccurrence.link], 

270 [], 

271 [EventOccurrence.content], 

272 ) 

273 

274 occurrences = execute_search_statement( 

275 session, 

276 select(EventOccurrence, rank, snippet) 

277 .join(Event, Event.id == EventOccurrence.event_id) 

278 .where(EventOccurrence.end_time >= func.now()), 

279 ) 

280 

281 return [ 

282 search_pb2.Result( 

283 rank=rank, 

284 event=event_to_pb(session, occurrence, context), 

285 snippet=snippet, 

286 ) 

287 for occurrence, rank, snippet in occurrences 

288 ] 

289 

290 

291def _search_clusters( 

292 session, search_statement, title_only, next_rank, page_size, context, include_communities, include_groups 

293): 

294 if not include_communities and not include_groups: 

295 return [] 

296 

297 rank, snippet, execute_search_statement = _gen_search_elements( 

298 search_statement, 

299 title_only, 

300 next_rank, 

301 page_size, 

302 [Cluster.name], 

303 [PageVersion.address, PageVersion.title], 

304 [Cluster.description], 

305 [PageVersion.content], 

306 ) 

307 

308 latest_pages = ( 

309 select(func.max(PageVersion.id).label("id")) 

310 .join(Page, Page.id == PageVersion.page_id) 

311 .where(Page.type == PageType.main_page) 

312 .group_by(PageVersion.page_id) 

313 .subquery() 

314 ) 

315 

316 clusters = execute_search_statement( 

317 session, 

318 select(Cluster, rank, snippet) 

319 .join(Page, Page.owner_cluster_id == Cluster.id) 

320 .join(PageVersion, PageVersion.page_id == Page.id) 

321 .join(latest_pages, latest_pages.c.id == PageVersion.id) 

322 .where(Cluster.is_official_cluster if include_communities and not include_groups else True) 

323 .where(~Cluster.is_official_cluster if not include_communities and include_groups else True), 

324 ) 

325 

326 return [ 

327 search_pb2.Result( 

328 rank=rank, 

329 community=( 

330 community_to_pb(session, cluster.official_cluster_for_node, context) 

331 if cluster.is_official_cluster 

332 else None 

333 ), 

334 group=group_to_pb(session, cluster, context) if not cluster.is_official_cluster else None, 

335 snippet=snippet, 

336 ) 

337 for cluster, rank, snippet in clusters 

338 ] 

339 

340 

341class Search(search_pb2_grpc.SearchServicer): 

342 def Search(self, request, context, session): 

343 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

344 # this is not an ideal page token, some results have equal rank (unlikely) 

345 next_rank = float(request.page_token) if request.page_token else None 

346 

347 all_results = ( 

348 _search_users( 

349 session, 

350 request.query, 

351 request.title_only, 

352 next_rank, 

353 page_size, 

354 context, 

355 request.include_users, 

356 ) 

357 + _search_pages( 

358 session, 

359 request.query, 

360 request.title_only, 

361 next_rank, 

362 page_size, 

363 context, 

364 request.include_places, 

365 request.include_guides, 

366 ) 

367 + _search_events( 

368 session, 

369 request.query, 

370 request.title_only, 

371 next_rank, 

372 page_size, 

373 context, 

374 ) 

375 + _search_clusters( 

376 session, 

377 request.query, 

378 request.title_only, 

379 next_rank, 

380 page_size, 

381 context, 

382 request.include_communities, 

383 request.include_groups, 

384 ) 

385 ) 

386 all_results.sort(key=lambda result: result.rank, reverse=True) 

387 return search_pb2.SearchRes( 

388 results=all_results[:page_size], 

389 next_page_token=str(all_results[page_size].rank) if len(all_results) > page_size else None, 

390 ) 

391 

392 def UserSearch(self, request, context, session): 

393 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

394 

395 # Base statement with visibility filter 

396 statement = select(User).where_users_visible(context) 

397 

398 # If exactly_user_ids is present, only filter by those IDs and ignore all other filters 

399 # This is a bit of a hacky feature to help with the frontend map implementation 

400 if len(request.exactly_user_ids) > 0: 

401 statement = statement.where(User.id.in_(request.exactly_user_ids)) 

402 else: 

403 # Apply all the normal filters 

404 if request.HasField("query"): 

405 if request.query_name_only: 

406 statement = statement.where( 

407 or_( 

408 User.name.ilike(f"%{request.query.value}%"), User.username.ilike(f"%{request.query.value}%") 

409 ) 

410 ) 

411 else: 

412 statement = statement.where( 

413 or_( 

414 User.name.ilike(f"%{request.query.value}%"), 

415 User.username.ilike(f"%{request.query.value}%"), 

416 User.city.ilike(f"%{request.query.value}%"), 

417 User.hometown.ilike(f"%{request.query.value}%"), 

418 User.about_me.ilike(f"%{request.query.value}%"), 

419 User.things_i_like.ilike(f"%{request.query.value}%"), 

420 User.about_place.ilike(f"%{request.query.value}%"), 

421 User.additional_information.ilike(f"%{request.query.value}%"), 

422 ) 

423 ) 

424 

425 if request.HasField("last_active"): 

426 raw_dt = to_aware_datetime(request.last_active) 

427 statement = statement.where(User.last_active >= last_active_coarsen(raw_dt)) 

428 

429 if len(request.gender) > 0: 

430 if not has_strong_verification(session, user): 

431 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NEED_STRONG_VERIFICATION) 

432 elif user.gender not in request.gender: 

433 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_INCLUDE_OWN_GENDER) 

434 else: 

435 statement = statement.where(User.gender.in_(request.gender)) 

436 

437 if len(request.hosting_status_filter) > 0: 

438 statement = statement.where( 

439 User.hosting_status.in_([hostingstatus2sql[status] for status in request.hosting_status_filter]) 

440 ) 

441 if len(request.meetup_status_filter) > 0: 

442 statement = statement.where( 

443 User.meetup_status.in_([meetupstatus2sql[status] for status in request.meetup_status_filter]) 

444 ) 

445 if len(request.smoking_location_filter) > 0: 

446 statement = statement.where( 

447 User.smoking_allowed.in_([smokinglocation2sql[loc] for loc in request.smoking_location_filter]) 

448 ) 

449 if len(request.sleeping_arrangement_filter) > 0: 

450 statement = statement.where( 

451 User.sleeping_arrangement.in_( 

452 [sleepingarrangement2sql[arr] for arr in request.sleeping_arrangement_filter] 

453 ) 

454 ) 

455 if len(request.parking_details_filter) > 0: 

456 statement = statement.where( 

457 User.parking_details.in_([parkingdetails2sql[det] for det in request.parking_details_filter]) 

458 ) 

459 # limits/default could be handled on the front end as well 

460 min_age = request.age_min.value if request.HasField("age_min") else 18 

461 max_age = request.age_max.value if request.HasField("age_max") else 200 

462 

463 statement = statement.where((User.age >= min_age) & (User.age <= max_age)) 

464 

465 # return results with by language code as only input 

466 # fluency in conversational or fluent 

467 

468 if len(request.language_ability_filter) > 0: 

469 language_options = [] 

470 for ability_filter in request.language_ability_filter: 

471 fluency_sql_value = fluency2sql.get(ability_filter.fluency) 

472 

473 if fluency_sql_value is None: 

474 continue 

475 language_options.append( 

476 and_( 

477 (LanguageAbility.language_code == ability_filter.code), 

478 (LanguageAbility.fluency >= (fluency_sql_value)), 

479 ) 

480 ) 

481 statement = statement.join(LanguageAbility, LanguageAbility.user_id == User.id) 

482 statement = statement.where(or_(*language_options)) 

483 

484 if request.HasField("profile_completed"): 

485 statement = statement.where(User.has_completed_profile == request.profile_completed.value) 

486 if request.HasField("guests"): 

487 statement = statement.where(User.max_guests >= request.guests.value) 

488 if request.HasField("last_minute"): 

489 statement = statement.where(User.last_minute == request.last_minute.value) 

490 if request.HasField("has_pets"): 

491 statement = statement.where(User.has_pets == request.has_pets.value) 

492 if request.HasField("accepts_pets"): 

493 statement = statement.where(User.accepts_pets == request.accepts_pets.value) 

494 if request.HasField("has_kids"): 

495 statement = statement.where(User.has_kids == request.has_kids.value) 

496 if request.HasField("accepts_kids"): 

497 statement = statement.where(User.accepts_kids == request.accepts_kids.value) 

498 if request.HasField("has_housemates"): 

499 statement = statement.where(User.has_housemates == request.has_housemates.value) 

500 if request.HasField("wheelchair_accessible"): 

501 statement = statement.where(User.wheelchair_accessible == request.wheelchair_accessible.value) 

502 if request.HasField("smokes_at_home"): 

503 statement = statement.where(User.smokes_at_home == request.smokes_at_home.value) 

504 if request.HasField("drinking_allowed"): 

505 statement = statement.where(User.drinking_allowed == request.drinking_allowed.value) 

506 if request.HasField("drinks_at_home"): 

507 statement = statement.where(User.drinks_at_home == request.drinks_at_home.value) 

508 if request.HasField("parking"): 

509 statement = statement.where(User.parking == request.parking.value) 

510 if request.HasField("camping_ok"): 

511 statement = statement.where(User.camping_ok == request.camping_ok.value) 

512 

513 if request.HasField("search_in_area"): 

514 # EPSG4326 measures distance in decimal degress 

515 # we want to check whether two circles overlap, so check if the distance between their centers is less 

516 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator) 

517 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng) 

518 statement = statement.where( 

519 func.ST_DWithin( 

520 # old: 

521 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111 

522 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius 

523 User.geom, 

524 search_point, 

525 (1000 + request.search_in_area.radius) / 111111, 

526 ) 

527 ) 

528 if request.HasField("search_in_rectangle"): 

529 statement = statement.where( 

530 func.ST_Within( 

531 User.geom, 

532 func.ST_MakeEnvelope( 

533 request.search_in_rectangle.lng_min, 

534 request.search_in_rectangle.lat_min, 

535 request.search_in_rectangle.lng_max, 

536 request.search_in_rectangle.lat_max, 

537 4326, 

538 ), 

539 ) 

540 ) 

541 if request.HasField("search_in_community_id"): 

542 # could do a join here as well, but this is just simpler 

543 node = session.execute( 

544 select(Node).where(Node.id == request.search_in_community_id) 

545 ).scalar_one_or_none() 

546 if not node: 

547 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

548 statement = statement.where(func.ST_Contains(node.geom, User.geom)) 

549 

550 if request.only_with_references: 

551 references = ( 

552 select(Reference.to_user_id.label("user_id")) 

553 .where_users_column_visible(context, Reference.from_user_id) 

554 .distinct() 

555 .subquery() 

556 ) 

557 statement = statement.join(references, references.c.user_id == User.id) 

558 

559 if request.only_with_strong_verification: 

560 statement = statement.join( 

561 StrongVerificationAttempt, 

562 and_( 

563 StrongVerificationAttempt.user_id == User.id, 

564 StrongVerificationAttempt.has_strong_verification(User), 

565 ), 

566 ) 

567 # TODO: 

568 # bool friends_only = 13; 

569 

570 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

571 next_recommendation_score = float(decrypt_page_token(request.page_token)) if request.page_token else 1e10 

572 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar() 

573 

574 statement = ( 

575 statement.where(User.recommendation_score <= next_recommendation_score) 

576 .order_by(User.recommendation_score.desc()) 

577 .limit(page_size + 1) 

578 ) 

579 users = session.execute(statement).scalars().all() 

580 

581 return search_pb2.UserSearchRes( 

582 results=[ 

583 search_pb2.Result( 

584 rank=1, 

585 user=user_model_to_pb(user, session, context), 

586 ) 

587 for user in users[:page_size] 

588 ], 

589 next_page_token=( 

590 encrypt_page_token(str(users[-1].recommendation_score)) if len(users) > page_size else None 

591 ), 

592 total_items=total_items, 

593 ) 

594 

595 def EventSearch(self, request, context, session): 

596 statement = ( 

597 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted) 

598 ) 

599 

600 if request.HasField("query"): 

601 if request.query_title_only: 

602 statement = statement.where(Event.title.ilike(f"%{request.query.value}%")) 

603 else: 

604 statement = statement.where( 

605 or_( 

606 Event.title.ilike(f"%{request.query.value}%"), 

607 EventOccurrence.content.ilike(f"%{request.query.value}%"), 

608 EventOccurrence.address.ilike(f"%{request.query.value}%"), 

609 ) 

610 ) 

611 

612 if request.only_online: 

613 statement = statement.where(EventOccurrence.geom == None) 

614 elif request.only_offline: 

615 statement = statement.where(EventOccurrence.geom != None) 

616 

617 if request.subscribed or request.attending or request.organizing or request.my_communities: 

618 where_ = [] 

619 

620 if request.subscribed: 

621 statement = statement.outerjoin( 

622 EventSubscription, 

623 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id), 

624 ) 

625 where_.append(EventSubscription.user_id != None) 

626 if request.organizing: 

627 statement = statement.outerjoin( 

628 EventOrganizer, 

629 and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id), 

630 ) 

631 where_.append(EventOrganizer.user_id != None) 

632 if request.attending: 

633 statement = statement.outerjoin( 

634 EventOccurrenceAttendee, 

635 and_( 

636 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id, 

637 EventOccurrenceAttendee.user_id == context.user_id, 

638 ), 

639 ) 

640 where_.append(EventOccurrenceAttendee.user_id != None) 

641 if request.my_communities: 

642 my_communities = ( 

643 session.execute( 

644 select(Node.id) 

645 .join(Cluster, Cluster.parent_node_id == Node.id) 

646 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

647 .where(ClusterSubscription.user_id == context.user_id) 

648 .where(Cluster.is_official_cluster) 

649 .order_by(Node.id) 

650 .limit(100000) 

651 ) 

652 .scalars() 

653 .all() 

654 ) 

655 where_.append(Event.parent_node_id.in_(my_communities)) 

656 

657 statement = statement.where(or_(*where_)) 

658 

659 if not request.include_cancelled: 

660 statement = statement.where(~EventOccurrence.is_cancelled) 

661 

662 if request.HasField("search_in_area"): 

663 # EPSG4326 measures distance in decimal degress 

664 # we want to check whether two circles overlap, so check if the distance between their centers is less 

665 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator) 

666 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng) 

667 statement = statement.where( 

668 func.ST_DWithin( 

669 # old: 

670 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111 

671 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius 

672 EventOccurrence.geom, 

673 search_point, 

674 (1000 + request.search_in_area.radius) / 111111, 

675 ) 

676 ) 

677 if request.HasField("search_in_rectangle"): 

678 statement = statement.where( 

679 func.ST_Within( 

680 EventOccurrence.geom, 

681 func.ST_MakeEnvelope( 

682 request.search_in_rectangle.lng_min, 

683 request.search_in_rectangle.lat_min, 

684 request.search_in_rectangle.lng_max, 

685 request.search_in_rectangle.lat_max, 

686 4326, 

687 ), 

688 ) 

689 ) 

690 if request.HasField("search_in_community_id"): 

691 # could do a join here as well, but this is just simpler 

692 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none() 

693 if not node: 

694 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

695 statement = statement.where(func.ST_Contains(node.geom, EventOccurrence.geom)) 

696 

697 if request.HasField("after"): 

698 statement = statement.where(EventOccurrence.start_time > to_aware_datetime(request.after)) 

699 if request.HasField("before"): 

700 statement = statement.where(EventOccurrence.end_time < to_aware_datetime(request.before)) 

701 

702 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

703 # the page token is a unix timestamp of where we left off 

704 page_token = ( 

705 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now() 

706 ) 

707 page_number = request.page_number or 1 

708 # Calculate the offset for pagination 

709 offset = (page_number - 1) * page_size 

710 

711 if not request.past: 

712 statement = statement.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

713 EventOccurrence.start_time.asc() 

714 ) 

715 else: 

716 statement = statement.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

717 EventOccurrence.start_time.desc() 

718 ) 

719 

720 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar() 

721 # Apply pagination by page number 

722 statement = statement.offset(offset).limit(page_size) if request.page_number else statement.limit(page_size + 1) 

723 occurrences = session.execute(statement).scalars().all() 

724 

725 return search_pb2.EventSearchRes( 

726 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

727 next_page_token=(str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None), 

728 total_items=total_items, 

729 )