Coverage for src/couchers/servicers/search.py: 85%

271 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-08-28 14:55 +0000

1""" 

2See //docs/search.md for overview. 

3""" 

4 

5from datetime import timedelta 

6 

7import grpc 

8from sqlalchemy.sql import and_, func, or_ 

9 

10from couchers import errors, urls 

11from couchers.crypto import decrypt_page_token, encrypt_page_token 

12from couchers.helpers.strong_verification import has_strong_verification 

13from couchers.materialized_views import LiteUser, UserResponseRate 

14from couchers.models import ( 

15 Cluster, 

16 ClusterSubscription, 

17 Event, 

18 EventOccurrence, 

19 EventOccurrenceAttendee, 

20 EventOrganizer, 

21 EventSubscription, 

22 LanguageAbility, 

23 Node, 

24 Page, 

25 PageType, 

26 PageVersion, 

27 Reference, 

28 StrongVerificationAttempt, 

29 User, 

30) 

31from couchers.reranker import reranker 

32from couchers.servicers.api import ( 

33 fluency2sql, 

34 get_num_references, 

35 hostingstatus2api, 

36 hostingstatus2sql, 

37 meetupstatus2api, 

38 meetupstatus2sql, 

39 parkingdetails2sql, 

40 response_rate_to_pb, 

41 sleepingarrangement2sql, 

42 smokinglocation2sql, 

43 user_model_to_pb, 

44) 

45from couchers.servicers.communities import community_to_pb 

46from couchers.servicers.events import event_to_pb 

47from couchers.servicers.groups import group_to_pb 

48from couchers.servicers.pages import page_to_pb 

49from couchers.sql import couchers_select as select 

50from couchers.utils import ( 

51 Timestamp_from_datetime, 

52 create_coordinate, 

53 dt_from_millis, 

54 get_coordinates, 

55 last_active_coarsen, 

56 millis_from_dt, 

57 now, 

58 to_aware_datetime, 

59) 

60from proto import search_pb2, search_pb2_grpc 

61 

62# searches are a bit expensive, we'd rather send back a bunch of results at once than lots of small pages 

63MAX_PAGINATION_LENGTH = 100 

64 

65REGCONFIG = "english" 

66TRI_SIMILARITY_THRESHOLD = 0.6 

67TRI_SIMILARITY_WEIGHT = 5 

68 

69 

70def _join_with_space(coalesces): 

71 # the objects in coalesces are not strings, so we can't do " ".join(coalesces). They're SQLAlchemy magic. 

72 if not coalesces: 

73 return "" 

74 out = coalesces[0] 

75 for coalesce in coalesces[1:]: 

76 out += " " + coalesce 

77 return out 

78 

79 

80def _build_tsv(A, B=None, C=None, D=None): 

81 """ 

82 Given lists for A, B, C, and D, builds a tsvector from them. 

83 """ 

84 B = B or [] 

85 C = C or [] 

86 D = D or [] 

87 tsv = func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in A])), "A") 

88 if B: 

89 tsv = tsv.concat( 

90 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in B])), "B") 

91 ) 

92 if C: 

93 tsv = tsv.concat( 

94 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in C])), "C") 

95 ) 

96 if D: 

97 tsv = tsv.concat( 

98 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in D])), "D") 

99 ) 

100 return tsv 

101 

102 

103def _build_doc(A, B=None, C=None, D=None): 

104 """ 

105 Builds the raw document (without to_tsvector and weighting), used for extracting snippet 

106 """ 

107 B = B or [] 

108 C = C or [] 

109 D = D or [] 

110 doc = _join_with_space([func.coalesce(bit, "") for bit in A]) 

111 if B: 

112 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in B]) 

113 if C: 

114 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in C]) 

115 if D: 

116 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in D]) 

117 return doc 

118 

119 

120def _similarity(statement, text): 

121 return func.word_similarity(func.unaccent(statement), func.unaccent(text)) 

122 

123 

124def _gen_search_elements(statement, title_only, next_rank, page_size, A, B=None, C=None, D=None): 

125 """ 

126 Given an sql statement and four sets of fields, (A, B, C, D), generates a bunch of postgres expressions for full text search. 

127 

128 The four sets are in decreasing order of "importance" for ranking. 

129 

130 A should be the "title", the others can be anything. 

131 

132 If title_only=True, we only perform a trigram search against A only 

133 """ 

134 B = B or [] 

135 C = C or [] 

136 D = D or [] 

137 if not title_only: 

138 # a postgres tsquery object that can be used to match against a tsvector 

139 tsq = func.websearch_to_tsquery(REGCONFIG, statement) 

140 

141 # the tsvector object that we want to search against with our tsquery 

142 tsv = _build_tsv(A, B, C, D) 

143 

144 # document to generate snippet from 

145 doc = _build_doc(A, B, C, D) 

146 

147 title = _build_doc(A) 

148 

149 # trigram based text similarity between title and sql statement string 

150 sim = _similarity(statement, title) 

151 

152 # ranking algo, weigh the similarity a lot, the text-based ranking less 

153 rank = (TRI_SIMILARITY_WEIGHT * sim + func.ts_rank_cd(tsv, tsq)).label("rank") 

154 

155 # the snippet with results highlighted 

156 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet") 

157 

158 def execute_search_statement(session, orig_statement): 

159 """ 

160 Does the right search filtering, limiting, and ordering for the initial statement 

161 """ 

162 return session.execute( 

163 orig_statement.where(or_(tsv.op("@@")(tsq), sim > TRI_SIMILARITY_THRESHOLD)) 

164 .where(rank <= next_rank if next_rank is not None else True) 

165 .order_by(rank.desc()) 

166 .limit(page_size + 1) 

167 ).all() 

168 

169 else: 

170 title = _build_doc(A) 

171 

172 # trigram based text similarity between title and sql statement string 

173 sim = _similarity(statement, title) 

174 

175 # ranking algo, weigh the similarity a lot, the text-based ranking less 

176 rank = sim.label("rank") 

177 

178 # used only for headline 

179 tsq = func.websearch_to_tsquery(REGCONFIG, statement) 

180 doc = _build_doc(A, B, C, D) 

181 

182 # the snippet with results highlighted 

183 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet") 

184 

185 def execute_search_statement(session, orig_statement): 

186 """ 

187 Does the right search filtering, limiting, and ordering for the initial statement 

188 """ 

189 return session.execute( 

190 orig_statement.where(sim > TRI_SIMILARITY_THRESHOLD) 

191 .where(rank <= next_rank if next_rank is not None else True) 

192 .order_by(rank.desc()) 

193 .limit(page_size + 1) 

194 ).all() 

195 

196 return rank, snippet, execute_search_statement 

197 

198 

199def _search_users(session, search_statement, title_only, next_rank, page_size, context, include_users): 

200 if not include_users: 

201 return [] 

202 rank, snippet, execute_search_statement = _gen_search_elements( 

203 search_statement, 

204 title_only, 

205 next_rank, 

206 page_size, 

207 [User.username, User.name], 

208 [User.city], 

209 [User.about_me], 

210 [User.things_i_like, User.about_place, User.additional_information], 

211 ) 

212 

213 users = execute_search_statement(session, select(User, rank, snippet).where_users_visible(context)) 

214 

215 return [ 

216 search_pb2.Result( 

217 rank=rank, 

218 user=user_model_to_pb(page, session, context), 

219 snippet=snippet, 

220 ) 

221 for page, rank, snippet in users 

222 ] 

223 

224 

225def _search_pages(session, search_statement, title_only, next_rank, page_size, context, include_places, include_guides): 

226 rank, snippet, execute_search_statement = _gen_search_elements( 

227 search_statement, 

228 title_only, 

229 next_rank, 

230 page_size, 

231 [PageVersion.title], 

232 [PageVersion.address], 

233 [], 

234 [PageVersion.content], 

235 ) 

236 if not include_places and not include_guides: 

237 return [] 

238 

239 latest_pages = ( 

240 select(func.max(PageVersion.id).label("id")) 

241 .join(Page, Page.id == PageVersion.page_id) 

242 .where( 

243 or_( 

244 (Page.type == PageType.place) if include_places else False, 

245 (Page.type == PageType.guide) if include_guides else False, 

246 ) 

247 ) 

248 .group_by(PageVersion.page_id) 

249 .subquery() 

250 ) 

251 

252 pages = execute_search_statement( 

253 session, 

254 select(Page, rank, snippet) 

255 .join(PageVersion, PageVersion.page_id == Page.id) 

256 .join(latest_pages, latest_pages.c.id == PageVersion.id), 

257 ) 

258 

259 return [ 

260 search_pb2.Result( 

261 rank=rank, 

262 place=page_to_pb(session, page, context) if page.type == PageType.place else None, 

263 guide=page_to_pb(session, page, context) if page.type == PageType.guide else None, 

264 snippet=snippet, 

265 ) 

266 for page, rank, snippet in pages 

267 ] 

268 

269 

270def _search_events(session, search_statement, title_only, next_rank, page_size, context): 

271 rank, snippet, execute_search_statement = _gen_search_elements( 

272 search_statement, 

273 title_only, 

274 next_rank, 

275 page_size, 

276 [Event.title], 

277 [EventOccurrence.address, EventOccurrence.link], 

278 [], 

279 [EventOccurrence.content], 

280 ) 

281 

282 occurrences = execute_search_statement( 

283 session, 

284 select(EventOccurrence, rank, snippet) 

285 .join(Event, Event.id == EventOccurrence.event_id) 

286 .where(EventOccurrence.end_time >= func.now()), 

287 ) 

288 

289 return [ 

290 search_pb2.Result( 

291 rank=rank, 

292 event=event_to_pb(session, occurrence, context), 

293 snippet=snippet, 

294 ) 

295 for occurrence, rank, snippet in occurrences 

296 ] 

297 

298 

299def _search_clusters( 

300 session, search_statement, title_only, next_rank, page_size, context, include_communities, include_groups 

301): 

302 if not include_communities and not include_groups: 

303 return [] 

304 

305 rank, snippet, execute_search_statement = _gen_search_elements( 

306 search_statement, 

307 title_only, 

308 next_rank, 

309 page_size, 

310 [Cluster.name], 

311 [PageVersion.address, PageVersion.title], 

312 [Cluster.description], 

313 [PageVersion.content], 

314 ) 

315 

316 latest_pages = ( 

317 select(func.max(PageVersion.id).label("id")) 

318 .join(Page, Page.id == PageVersion.page_id) 

319 .where(Page.type == PageType.main_page) 

320 .group_by(PageVersion.page_id) 

321 .subquery() 

322 ) 

323 

324 clusters = execute_search_statement( 

325 session, 

326 select(Cluster, rank, snippet) 

327 .join(Page, Page.owner_cluster_id == Cluster.id) 

328 .join(PageVersion, PageVersion.page_id == Page.id) 

329 .join(latest_pages, latest_pages.c.id == PageVersion.id) 

330 .where(Cluster.is_official_cluster if include_communities and not include_groups else True) 

331 .where(~Cluster.is_official_cluster if not include_communities and include_groups else True), 

332 ) 

333 

334 return [ 

335 search_pb2.Result( 

336 rank=rank, 

337 community=( 

338 community_to_pb(session, cluster.official_cluster_for_node, context) 

339 if cluster.is_official_cluster 

340 else None 

341 ), 

342 group=group_to_pb(session, cluster, context) if not cluster.is_official_cluster else None, 

343 snippet=snippet, 

344 ) 

345 for cluster, rank, snippet in clusters 

346 ] 

347 

348 

349def _user_search_inner(request, context, session): 

350 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

351 

352 # Base statement with visibility filter 

353 statement = select(User.id, User.recommendation_score).where_users_visible(context) 

354 # make sure that only users who are in LiteUser show up 

355 statement = statement.join(LiteUser, LiteUser.id == User.id) 

356 

357 # If exactly_user_ids is present, only filter by those IDs and ignore all other filters 

358 # This is a bit of a hacky feature to help with the frontend map implementation 

359 if len(request.exactly_user_ids) > 0: 

360 statement = statement.where(User.id.in_(request.exactly_user_ids)) 

361 else: 

362 # Apply all the normal filters 

363 if request.HasField("query"): 

364 if request.query_name_only: 

365 statement = statement.where( 

366 or_(User.name.ilike(f"%{request.query.value}%"), User.username.ilike(f"%{request.query.value}%")) 

367 ) 

368 else: 

369 statement = statement.where( 

370 or_( 

371 User.name.ilike(f"%{request.query.value}%"), 

372 User.username.ilike(f"%{request.query.value}%"), 

373 User.city.ilike(f"%{request.query.value}%"), 

374 User.hometown.ilike(f"%{request.query.value}%"), 

375 User.about_me.ilike(f"%{request.query.value}%"), 

376 User.things_i_like.ilike(f"%{request.query.value}%"), 

377 User.about_place.ilike(f"%{request.query.value}%"), 

378 User.additional_information.ilike(f"%{request.query.value}%"), 

379 ) 

380 ) 

381 

382 if request.HasField("last_active"): 

383 raw_dt = to_aware_datetime(request.last_active) 

384 statement = statement.where(User.last_active >= last_active_coarsen(raw_dt)) 

385 

386 if len(request.gender) > 0: 

387 if not has_strong_verification(session, user): 

388 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NEED_STRONG_VERIFICATION) 

389 elif user.gender not in request.gender: 

390 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_INCLUDE_OWN_GENDER) 

391 else: 

392 statement = statement.where(User.gender.in_(request.gender)) 

393 

394 if len(request.hosting_status_filter) > 0: 

395 statement = statement.where( 

396 User.hosting_status.in_([hostingstatus2sql[status] for status in request.hosting_status_filter]) 

397 ) 

398 if len(request.meetup_status_filter) > 0: 

399 statement = statement.where( 

400 User.meetup_status.in_([meetupstatus2sql[status] for status in request.meetup_status_filter]) 

401 ) 

402 if len(request.smoking_location_filter) > 0: 

403 statement = statement.where( 

404 User.smoking_allowed.in_([smokinglocation2sql[loc] for loc in request.smoking_location_filter]) 

405 ) 

406 if len(request.sleeping_arrangement_filter) > 0: 

407 statement = statement.where( 

408 User.sleeping_arrangement.in_( 

409 [sleepingarrangement2sql[arr] for arr in request.sleeping_arrangement_filter] 

410 ) 

411 ) 

412 if len(request.parking_details_filter) > 0: 

413 statement = statement.where( 

414 User.parking_details.in_([parkingdetails2sql[det] for det in request.parking_details_filter]) 

415 ) 

416 # limits/default could be handled on the front end as well 

417 min_age = request.age_min.value if request.HasField("age_min") else 18 

418 max_age = request.age_max.value if request.HasField("age_max") else 200 

419 

420 statement = statement.where((User.age >= min_age) & (User.age <= max_age)) 

421 

422 # return results with by language code as only input 

423 # fluency in conversational or fluent 

424 

425 if len(request.language_ability_filter) > 0: 

426 language_options = [] 

427 for ability_filter in request.language_ability_filter: 

428 fluency_sql_value = fluency2sql.get(ability_filter.fluency) 

429 

430 if fluency_sql_value is None: 

431 continue 

432 language_options.append( 

433 and_( 

434 (LanguageAbility.language_code == ability_filter.code), 

435 (LanguageAbility.fluency >= (fluency_sql_value)), 

436 ) 

437 ) 

438 statement = statement.join(LanguageAbility, LanguageAbility.user_id == User.id) 

439 statement = statement.where(or_(*language_options)) 

440 

441 if request.HasField("profile_completed"): 

442 statement = statement.where(User.has_completed_profile == request.profile_completed.value) 

443 if request.HasField("guests"): 

444 statement = statement.where(User.max_guests >= request.guests.value) 

445 if request.HasField("last_minute"): 

446 statement = statement.where(User.last_minute == request.last_minute.value) 

447 if request.HasField("has_pets"): 

448 statement = statement.where(User.has_pets == request.has_pets.value) 

449 if request.HasField("accepts_pets"): 

450 statement = statement.where(User.accepts_pets == request.accepts_pets.value) 

451 if request.HasField("has_kids"): 

452 statement = statement.where(User.has_kids == request.has_kids.value) 

453 if request.HasField("accepts_kids"): 

454 statement = statement.where(User.accepts_kids == request.accepts_kids.value) 

455 if request.HasField("has_housemates"): 

456 statement = statement.where(User.has_housemates == request.has_housemates.value) 

457 if request.HasField("wheelchair_accessible"): 

458 statement = statement.where(User.wheelchair_accessible == request.wheelchair_accessible.value) 

459 if request.HasField("smokes_at_home"): 

460 statement = statement.where(User.smokes_at_home == request.smokes_at_home.value) 

461 if request.HasField("drinking_allowed"): 

462 statement = statement.where(User.drinking_allowed == request.drinking_allowed.value) 

463 if request.HasField("drinks_at_home"): 

464 statement = statement.where(User.drinks_at_home == request.drinks_at_home.value) 

465 if request.HasField("parking"): 

466 statement = statement.where(User.parking == request.parking.value) 

467 if request.HasField("camping_ok"): 

468 statement = statement.where(User.camping_ok == request.camping_ok.value) 

469 

470 if request.HasField("search_in_area"): 

471 # EPSG4326 measures distance in decimal degress 

472 # we want to check whether two circles overlap, so check if the distance between their centers is less 

473 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator) 

474 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng) 

475 statement = statement.where( 

476 func.ST_DWithin( 

477 # old: 

478 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111 

479 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius 

480 User.geom, 

481 search_point, 

482 (1000 + request.search_in_area.radius) / 111111, 

483 ) 

484 ) 

485 if request.HasField("search_in_rectangle"): 

486 statement = statement.where( 

487 func.ST_Within( 

488 User.geom, 

489 func.ST_MakeEnvelope( 

490 request.search_in_rectangle.lng_min, 

491 request.search_in_rectangle.lat_min, 

492 request.search_in_rectangle.lng_max, 

493 request.search_in_rectangle.lat_max, 

494 4326, 

495 ), 

496 ) 

497 ) 

498 if request.HasField("search_in_community_id"): 

499 # could do a join here as well, but this is just simpler 

500 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none() 

501 if not node: 

502 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

503 statement = statement.where(func.ST_Contains(node.geom, User.geom)) 

504 

505 if request.only_with_references: 

506 references = ( 

507 select(Reference.to_user_id.label("user_id")) 

508 .where_users_column_visible(context, Reference.from_user_id) 

509 .distinct() 

510 .subquery() 

511 ) 

512 statement = statement.join(references, references.c.user_id == User.id) 

513 

514 if request.only_with_strong_verification: 

515 statement = statement.join( 

516 StrongVerificationAttempt, 

517 and_( 

518 StrongVerificationAttempt.user_id == User.id, 

519 StrongVerificationAttempt.has_strong_verification(User), 

520 ), 

521 ) 

522 # TODO: 

523 # bool friends_only = 13; 

524 

525 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

526 next_recommendation_score = float(decrypt_page_token(request.page_token)) if request.page_token else 1e10 

527 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar() 

528 

529 statement = ( 

530 statement.where(User.recommendation_score <= next_recommendation_score) 

531 .order_by(User.recommendation_score.desc()) 

532 .limit(page_size + 1) 

533 ) 

534 res = session.execute(statement).all() 

535 if res: 

536 users, rec_scores = zip(*res) 

537 else: 

538 users = [] 

539 next_page_token = encrypt_page_token(str(rec_scores[-1])) if len(users) > page_size else None 

540 return users[:page_size], next_page_token, total_items 

541 

542 

543class Search(search_pb2_grpc.SearchServicer): 

544 def Search(self, request, context, session): 

545 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

546 # this is not an ideal page token, some results have equal rank (unlikely) 

547 next_rank = float(request.page_token) if request.page_token else None 

548 

549 all_results = ( 

550 _search_users( 

551 session, 

552 request.query, 

553 request.title_only, 

554 next_rank, 

555 page_size, 

556 context, 

557 request.include_users, 

558 ) 

559 + _search_pages( 

560 session, 

561 request.query, 

562 request.title_only, 

563 next_rank, 

564 page_size, 

565 context, 

566 request.include_places, 

567 request.include_guides, 

568 ) 

569 + _search_events( 

570 session, 

571 request.query, 

572 request.title_only, 

573 next_rank, 

574 page_size, 

575 context, 

576 ) 

577 + _search_clusters( 

578 session, 

579 request.query, 

580 request.title_only, 

581 next_rank, 

582 page_size, 

583 context, 

584 request.include_communities, 

585 request.include_groups, 

586 ) 

587 ) 

588 all_results.sort(key=lambda result: result.rank, reverse=True) 

589 return search_pb2.SearchRes( 

590 results=all_results[:page_size], 

591 next_page_token=str(all_results[page_size].rank) if len(all_results) > page_size else None, 

592 ) 

593 

594 def UserSearch(self, request, context, session): 

595 user_ids_to_return, next_page_token, total_items = _user_search_inner(request, context, session) 

596 

597 user_ids_to_users = dict(session.execute(select(User.id, User).where(User.id.in_(user_ids_to_return))).all()) 

598 

599 return search_pb2.UserSearchRes( 

600 results=[ 

601 search_pb2.Result( 

602 rank=1, 

603 user=user_model_to_pb(user_ids_to_users[user_id], session, context), 

604 ) 

605 for user_id in user_ids_to_return 

606 ], 

607 next_page_token=next_page_token, 

608 total_items=total_items, 

609 ) 

610 

611 def UserSearchV2(self, request, context, session): 

612 user_ids_to_return, next_page_token, total_items = _user_search_inner(request, context, session) 

613 

614 LiteUser_by_id = { 

615 lite_user.id: lite_user 

616 for lite_user in session.execute(select(LiteUser).where(LiteUser.id.in_(user_ids_to_return))) 

617 .scalars() 

618 .all() 

619 } 

620 

621 response_rate_by_id = { 

622 resp_rate.user_id: resp_rate 

623 for resp_rate in session.execute( 

624 select(UserResponseRate).where(UserResponseRate.user_id.in_(user_ids_to_return)) 

625 ) 

626 .scalars() 

627 .all() 

628 } 

629 

630 db_user_data_by_id = { 

631 user_id: (about_me, gender, last_active, hosting_status, meetup_status, joined) 

632 for user_id, about_me, gender, last_active, hosting_status, meetup_status, joined in session.execute( 

633 select( 

634 User.id, 

635 User.about_me, 

636 User.gender, 

637 User.last_active, 

638 User.hosting_status, 

639 User.meetup_status, 

640 User.joined, 

641 ).where(User.id.in_(user_ids_to_return)) 

642 ).all() 

643 } 

644 

645 ref_counts_by_user_id = get_num_references(session, user_ids_to_return) 

646 

647 def _user_to_search_user(user_id): 

648 lite_user = LiteUser_by_id[user_id] 

649 

650 about_me, gender, last_active, hosting_status, meetup_status, joined = db_user_data_by_id[user_id] 

651 

652 lat, lng = get_coordinates(lite_user.geom) 

653 return search_pb2.SearchUser( 

654 user_id=lite_user.id, 

655 username=lite_user.username, 

656 name=lite_user.name, 

657 city=lite_user.city, 

658 joined=Timestamp_from_datetime(last_active_coarsen(joined)), 

659 has_completed_profile=lite_user.has_completed_profile, 

660 has_completed_my_home=lite_user.has_completed_my_home, 

661 lat=lat, 

662 lng=lng, 

663 profile_snippet=about_me, 

664 num_references=ref_counts_by_user_id.get(lite_user.id, 0), 

665 gender=gender, 

666 age=int(lite_user.age), 

667 last_active=Timestamp_from_datetime(last_active_coarsen(last_active)), 

668 hosting_status=hostingstatus2api[hosting_status], 

669 meetup_status=meetupstatus2api[meetup_status], 

670 avatar_url=urls.media_url(filename=lite_user.avatar_filename, size="full") 

671 if lite_user.avatar_filename 

672 else None, 

673 avatar_thumbnail_url=urls.media_url(filename=lite_user.avatar_filename, size="thumbnail") 

674 if lite_user.avatar_filename 

675 else None, 

676 has_strong_verification=lite_user.has_strong_verification, 

677 **response_rate_to_pb(response_rate_by_id.get(user_id)), 

678 ) 

679 

680 results = reranker([_user_to_search_user(user_id) for user_id in user_ids_to_return]) 

681 

682 return search_pb2.UserSearchV2Res( 

683 results=results, 

684 next_page_token=next_page_token, 

685 total_items=total_items, 

686 ) 

687 

688 def EventSearch(self, request, context, session): 

689 statement = ( 

690 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted) 

691 ) 

692 

693 if request.HasField("query"): 

694 if request.query_title_only: 

695 statement = statement.where(Event.title.ilike(f"%{request.query.value}%")) 

696 else: 

697 statement = statement.where( 

698 or_( 

699 Event.title.ilike(f"%{request.query.value}%"), 

700 EventOccurrence.content.ilike(f"%{request.query.value}%"), 

701 EventOccurrence.address.ilike(f"%{request.query.value}%"), 

702 ) 

703 ) 

704 

705 if request.only_online: 

706 statement = statement.where(EventOccurrence.geom == None) 

707 elif request.only_offline: 

708 statement = statement.where(EventOccurrence.geom != None) 

709 

710 if request.subscribed or request.attending or request.organizing or request.my_communities: 

711 where_ = [] 

712 

713 if request.subscribed: 

714 statement = statement.outerjoin( 

715 EventSubscription, 

716 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id), 

717 ) 

718 where_.append(EventSubscription.user_id != None) 

719 if request.organizing: 

720 statement = statement.outerjoin( 

721 EventOrganizer, 

722 and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id), 

723 ) 

724 where_.append(EventOrganizer.user_id != None) 

725 if request.attending: 

726 statement = statement.outerjoin( 

727 EventOccurrenceAttendee, 

728 and_( 

729 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id, 

730 EventOccurrenceAttendee.user_id == context.user_id, 

731 ), 

732 ) 

733 where_.append(EventOccurrenceAttendee.user_id != None) 

734 if request.my_communities: 

735 my_communities = ( 

736 session.execute( 

737 select(Node.id) 

738 .join(Cluster, Cluster.parent_node_id == Node.id) 

739 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

740 .where(ClusterSubscription.user_id == context.user_id) 

741 .where(Cluster.is_official_cluster) 

742 .order_by(Node.id) 

743 .limit(100000) 

744 ) 

745 .scalars() 

746 .all() 

747 ) 

748 where_.append(Event.parent_node_id.in_(my_communities)) 

749 

750 statement = statement.where(or_(*where_)) 

751 

752 if not request.include_cancelled: 

753 statement = statement.where(~EventOccurrence.is_cancelled) 

754 

755 if request.HasField("search_in_area"): 

756 # EPSG4326 measures distance in decimal degress 

757 # we want to check whether two circles overlap, so check if the distance between their centers is less 

758 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator) 

759 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng) 

760 statement = statement.where( 

761 func.ST_DWithin( 

762 # old: 

763 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111 

764 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius 

765 EventOccurrence.geom, 

766 search_point, 

767 (1000 + request.search_in_area.radius) / 111111, 

768 ) 

769 ) 

770 if request.HasField("search_in_rectangle"): 

771 statement = statement.where( 

772 func.ST_Within( 

773 EventOccurrence.geom, 

774 func.ST_MakeEnvelope( 

775 request.search_in_rectangle.lng_min, 

776 request.search_in_rectangle.lat_min, 

777 request.search_in_rectangle.lng_max, 

778 request.search_in_rectangle.lat_max, 

779 4326, 

780 ), 

781 ) 

782 ) 

783 if request.HasField("search_in_community_id"): 

784 # could do a join here as well, but this is just simpler 

785 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none() 

786 if not node: 

787 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

788 statement = statement.where(func.ST_Contains(node.geom, EventOccurrence.geom)) 

789 

790 if request.HasField("after"): 

791 statement = statement.where(EventOccurrence.start_time > to_aware_datetime(request.after)) 

792 if request.HasField("before"): 

793 statement = statement.where(EventOccurrence.end_time < to_aware_datetime(request.before)) 

794 

795 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

796 # the page token is a unix timestamp of where we left off 

797 page_token = ( 

798 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now() 

799 ) 

800 page_number = request.page_number or 1 

801 # Calculate the offset for pagination 

802 offset = (page_number - 1) * page_size 

803 

804 if not request.past: 

805 statement = statement.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

806 EventOccurrence.start_time.asc() 

807 ) 

808 else: 

809 statement = statement.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

810 EventOccurrence.start_time.desc() 

811 ) 

812 

813 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar() 

814 # Apply pagination by page number 

815 statement = statement.offset(offset).limit(page_size) if request.page_number else statement.limit(page_size + 1) 

816 occurrences = session.execute(statement).scalars().all() 

817 

818 return search_pb2.EventSearchRes( 

819 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

820 next_page_token=(str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None), 

821 total_items=total_items, 

822 )