Coverage for src/couchers/servicers/search.py: 87%

269 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-05 00:16 +0000

1""" 

2See //docs/search.md for overview. 

3""" 

4 

5from datetime import timedelta 

6 

7import grpc 

8from sqlalchemy.sql import and_, func, or_ 

9 

10from couchers import errors, urls 

11from couchers.crypto import decrypt_page_token, encrypt_page_token 

12from couchers.helpers.strong_verification import has_strong_verification 

13from couchers.materialized_views import LiteUser, UserResponseRate 

14from couchers.models import ( 

15 Cluster, 

16 ClusterSubscription, 

17 Event, 

18 EventOccurrence, 

19 EventOccurrenceAttendee, 

20 EventOrganizer, 

21 EventSubscription, 

22 LanguageAbility, 

23 Node, 

24 Page, 

25 PageType, 

26 PageVersion, 

27 Reference, 

28 StrongVerificationAttempt, 

29 User, 

30) 

31from couchers.reranker import reranker 

32from couchers.servicers.api import ( 

33 fluency2sql, 

34 get_num_references, 

35 hostingstatus2api, 

36 hostingstatus2sql, 

37 meetupstatus2api, 

38 meetupstatus2sql, 

39 parkingdetails2sql, 

40 response_rate_to_pb, 

41 sleepingarrangement2sql, 

42 smokinglocation2sql, 

43 user_model_to_pb, 

44) 

45from couchers.servicers.communities import community_to_pb 

46from couchers.servicers.events import event_to_pb 

47from couchers.servicers.groups import group_to_pb 

48from couchers.servicers.pages import page_to_pb 

49from couchers.sql import couchers_select as select 

50from couchers.utils import ( 

51 Timestamp_from_datetime, 

52 create_coordinate, 

53 dt_from_millis, 

54 get_coordinates, 

55 last_active_coarsen, 

56 millis_from_dt, 

57 now, 

58 to_aware_datetime, 

59) 

60from proto import search_pb2, search_pb2_grpc 

61 

62# searches are a bit expensive, we'd rather send back a bunch of results at once than lots of small pages 

63MAX_PAGINATION_LENGTH = 100 

64 

65REGCONFIG = "english" 

66TRI_SIMILARITY_THRESHOLD = 0.6 

67TRI_SIMILARITY_WEIGHT = 5 

68 

69 

70def _join_with_space(coalesces): 

71 # the objects in coalesces are not strings, so we can't do " ".join(coalesces). They're SQLAlchemy magic. 

72 if not coalesces: 

73 return "" 

74 out = coalesces[0] 

75 for coalesce in coalesces[1:]: 

76 out += " " + coalesce 

77 return out 

78 

79 

80def _build_tsv(A, B=None, C=None, D=None): 

81 """ 

82 Given lists for A, B, C, and D, builds a tsvector from them. 

83 """ 

84 B = B or [] 

85 C = C or [] 

86 D = D or [] 

87 tsv = func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in A])), "A") 

88 if B: 

89 tsv = tsv.concat( 

90 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in B])), "B") 

91 ) 

92 if C: 

93 tsv = tsv.concat( 

94 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in C])), "C") 

95 ) 

96 if D: 

97 tsv = tsv.concat( 

98 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in D])), "D") 

99 ) 

100 return tsv 

101 

102 

103def _build_doc(A, B=None, C=None, D=None): 

104 """ 

105 Builds the raw document (without to_tsvector and weighting), used for extracting snippet 

106 """ 

107 B = B or [] 

108 C = C or [] 

109 D = D or [] 

110 doc = _join_with_space([func.coalesce(bit, "") for bit in A]) 

111 if B: 

112 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in B]) 

113 if C: 

114 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in C]) 

115 if D: 

116 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in D]) 

117 return doc 

118 

119 

120def _similarity(statement, text): 

121 return func.word_similarity(func.unaccent(statement), func.unaccent(text)) 

122 

123 

124def _gen_search_elements(statement, title_only, next_rank, page_size, A, B=None, C=None, D=None): 

125 """ 

126 Given an sql statement and four sets of fields, (A, B, C, D), generates a bunch of postgres expressions for full text search. 

127 

128 The four sets are in decreasing order of "importance" for ranking. 

129 

130 A should be the "title", the others can be anything. 

131 

132 If title_only=True, we only perform a trigram search against A only 

133 """ 

134 B = B or [] 

135 C = C or [] 

136 D = D or [] 

137 if not title_only: 

138 # a postgres tsquery object that can be used to match against a tsvector 

139 tsq = func.websearch_to_tsquery(REGCONFIG, statement) 

140 

141 # the tsvector object that we want to search against with our tsquery 

142 tsv = _build_tsv(A, B, C, D) 

143 

144 # document to generate snippet from 

145 doc = _build_doc(A, B, C, D) 

146 

147 title = _build_doc(A) 

148 

149 # trigram based text similarity between title and sql statement string 

150 sim = _similarity(statement, title) 

151 

152 # ranking algo, weigh the similarity a lot, the text-based ranking less 

153 rank = (TRI_SIMILARITY_WEIGHT * sim + func.ts_rank_cd(tsv, tsq)).label("rank") 

154 

155 # the snippet with results highlighted 

156 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet") 

157 

158 def execute_search_statement(session, orig_statement): 

159 """ 

160 Does the right search filtering, limiting, and ordering for the initial statement 

161 """ 

162 return session.execute( 

163 orig_statement.where(or_(tsv.op("@@")(tsq), sim > TRI_SIMILARITY_THRESHOLD)) 

164 .where(rank <= next_rank if next_rank is not None else True) 

165 .order_by(rank.desc()) 

166 .limit(page_size + 1) 

167 ).all() 

168 

169 else: 

170 title = _build_doc(A) 

171 

172 # trigram based text similarity between title and sql statement string 

173 sim = _similarity(statement, title) 

174 

175 # ranking algo, weigh the similarity a lot, the text-based ranking less 

176 rank = sim.label("rank") 

177 

178 # used only for headline 

179 tsq = func.websearch_to_tsquery(REGCONFIG, statement) 

180 doc = _build_doc(A, B, C, D) 

181 

182 # the snippet with results highlighted 

183 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet") 

184 

185 def execute_search_statement(session, orig_statement): 

186 """ 

187 Does the right search filtering, limiting, and ordering for the initial statement 

188 """ 

189 return session.execute( 

190 orig_statement.where(sim > TRI_SIMILARITY_THRESHOLD) 

191 .where(rank <= next_rank if next_rank is not None else True) 

192 .order_by(rank.desc()) 

193 .limit(page_size + 1) 

194 ).all() 

195 

196 return rank, snippet, execute_search_statement 

197 

198 

199def _search_users(session, search_statement, title_only, next_rank, page_size, context, include_users): 

200 if not include_users: 

201 return [] 

202 rank, snippet, execute_search_statement = _gen_search_elements( 

203 search_statement, 

204 title_only, 

205 next_rank, 

206 page_size, 

207 [User.username, User.name], 

208 [User.city], 

209 [User.about_me], 

210 [User.things_i_like, User.about_place, User.additional_information], 

211 ) 

212 

213 users = execute_search_statement(session, select(User, rank, snippet).where_users_visible(context)) 

214 

215 return [ 

216 search_pb2.Result( 

217 rank=rank, 

218 user=user_model_to_pb(page, session, context), 

219 snippet=snippet, 

220 ) 

221 for page, rank, snippet in users 

222 ] 

223 

224 

225def _search_pages(session, search_statement, title_only, next_rank, page_size, context, include_places, include_guides): 

226 rank, snippet, execute_search_statement = _gen_search_elements( 

227 search_statement, 

228 title_only, 

229 next_rank, 

230 page_size, 

231 [PageVersion.title], 

232 [PageVersion.address], 

233 [], 

234 [PageVersion.content], 

235 ) 

236 if not include_places and not include_guides: 

237 return [] 

238 

239 latest_pages = ( 

240 select(func.max(PageVersion.id).label("id")) 

241 .join(Page, Page.id == PageVersion.page_id) 

242 .where( 

243 or_( 

244 (Page.type == PageType.place) if include_places else False, 

245 (Page.type == PageType.guide) if include_guides else False, 

246 ) 

247 ) 

248 .group_by(PageVersion.page_id) 

249 .subquery() 

250 ) 

251 

252 pages = execute_search_statement( 

253 session, 

254 select(Page, rank, snippet) 

255 .join(PageVersion, PageVersion.page_id == Page.id) 

256 .join(latest_pages, latest_pages.c.id == PageVersion.id), 

257 ) 

258 

259 return [ 

260 search_pb2.Result( 

261 rank=rank, 

262 place=page_to_pb(session, page, context) if page.type == PageType.place else None, 

263 guide=page_to_pb(session, page, context) if page.type == PageType.guide else None, 

264 snippet=snippet, 

265 ) 

266 for page, rank, snippet in pages 

267 ] 

268 

269 

270def _search_events(session, search_statement, title_only, next_rank, page_size, context): 

271 rank, snippet, execute_search_statement = _gen_search_elements( 

272 search_statement, 

273 title_only, 

274 next_rank, 

275 page_size, 

276 [Event.title], 

277 [EventOccurrence.address, EventOccurrence.link], 

278 [], 

279 [EventOccurrence.content], 

280 ) 

281 

282 occurrences = execute_search_statement( 

283 session, 

284 select(EventOccurrence, rank, snippet) 

285 .join(Event, Event.id == EventOccurrence.event_id) 

286 .where(EventOccurrence.end_time >= func.now()), 

287 ) 

288 

289 return [ 

290 search_pb2.Result( 

291 rank=rank, 

292 event=event_to_pb(session, occurrence, context), 

293 snippet=snippet, 

294 ) 

295 for occurrence, rank, snippet in occurrences 

296 ] 

297 

298 

299def _search_clusters( 

300 session, search_statement, title_only, next_rank, page_size, context, include_communities, include_groups 

301): 

302 if not include_communities and not include_groups: 

303 return [] 

304 

305 rank, snippet, execute_search_statement = _gen_search_elements( 

306 search_statement, 

307 title_only, 

308 next_rank, 

309 page_size, 

310 [Cluster.name], 

311 [PageVersion.address, PageVersion.title], 

312 [Cluster.description], 

313 [PageVersion.content], 

314 ) 

315 

316 latest_pages = ( 

317 select(func.max(PageVersion.id).label("id")) 

318 .join(Page, Page.id == PageVersion.page_id) 

319 .where(Page.type == PageType.main_page) 

320 .group_by(PageVersion.page_id) 

321 .subquery() 

322 ) 

323 

324 clusters = execute_search_statement( 

325 session, 

326 select(Cluster, rank, snippet) 

327 .join(Page, Page.owner_cluster_id == Cluster.id) 

328 .join(PageVersion, PageVersion.page_id == Page.id) 

329 .join(latest_pages, latest_pages.c.id == PageVersion.id) 

330 .where(Cluster.is_official_cluster if include_communities and not include_groups else True) 

331 .where(~Cluster.is_official_cluster if not include_communities and include_groups else True), 

332 ) 

333 

334 return [ 

335 search_pb2.Result( 

336 rank=rank, 

337 community=( 

338 community_to_pb(session, cluster.official_cluster_for_node, context) 

339 if cluster.is_official_cluster 

340 else None 

341 ), 

342 group=group_to_pb(session, cluster, context) if not cluster.is_official_cluster else None, 

343 snippet=snippet, 

344 ) 

345 for cluster, rank, snippet in clusters 

346 ] 

347 

348 

349def _user_search_inner(request, context, session): 

350 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

351 

352 # Base statement with visibility filter 

353 statement = select(User.id, User.recommendation_score).where_users_visible(context) 

354 # make sure that only users who are in LiteUser show up 

355 statement = statement.join(LiteUser, LiteUser.id == User.id) 

356 

357 # If exactly_user_ids is present, only filter by those IDs and ignore all other filters 

358 # This is a bit of a hacky feature to help with the frontend map implementation 

359 if len(request.exactly_user_ids) > 0: 

360 statement = statement.where(User.id.in_(request.exactly_user_ids)) 

361 else: 

362 # Apply all the normal filters 

363 if request.HasField("query"): 

364 if request.query_name_only: 

365 statement = statement.where( 

366 or_(User.name.ilike(f"%{request.query.value}%"), User.username.ilike(f"%{request.query.value}%")) 

367 ) 

368 else: 

369 statement = statement.where( 

370 or_( 

371 User.name.ilike(f"%{request.query.value}%"), 

372 User.username.ilike(f"%{request.query.value}%"), 

373 User.city.ilike(f"%{request.query.value}%"), 

374 User.hometown.ilike(f"%{request.query.value}%"), 

375 User.about_me.ilike(f"%{request.query.value}%"), 

376 User.things_i_like.ilike(f"%{request.query.value}%"), 

377 User.about_place.ilike(f"%{request.query.value}%"), 

378 User.additional_information.ilike(f"%{request.query.value}%"), 

379 ) 

380 ) 

381 

382 if request.HasField("last_active"): 

383 raw_dt = to_aware_datetime(request.last_active) 

384 statement = statement.where(User.last_active >= last_active_coarsen(raw_dt)) 

385 

386 if request.same_gender_only: 

387 if not has_strong_verification(session, user): 

388 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NEED_STRONG_VERIFICATION) 

389 statement = statement.where(User.gender == user.gender) 

390 

391 if len(request.hosting_status_filter) > 0: 

392 statement = statement.where( 

393 User.hosting_status.in_([hostingstatus2sql[status] for status in request.hosting_status_filter]) 

394 ) 

395 if len(request.meetup_status_filter) > 0: 

396 statement = statement.where( 

397 User.meetup_status.in_([meetupstatus2sql[status] for status in request.meetup_status_filter]) 

398 ) 

399 if len(request.smoking_location_filter) > 0: 

400 statement = statement.where( 

401 User.smoking_allowed.in_([smokinglocation2sql[loc] for loc in request.smoking_location_filter]) 

402 ) 

403 if len(request.sleeping_arrangement_filter) > 0: 

404 statement = statement.where( 

405 User.sleeping_arrangement.in_( 

406 [sleepingarrangement2sql[arr] for arr in request.sleeping_arrangement_filter] 

407 ) 

408 ) 

409 if len(request.parking_details_filter) > 0: 

410 statement = statement.where( 

411 User.parking_details.in_([parkingdetails2sql[det] for det in request.parking_details_filter]) 

412 ) 

413 # limits/default could be handled on the front end as well 

414 min_age = request.age_min.value if request.HasField("age_min") else 18 

415 max_age = request.age_max.value if request.HasField("age_max") else 200 

416 

417 statement = statement.where((User.age >= min_age) & (User.age <= max_age)) 

418 

419 # return results with by language code as only input 

420 # fluency in conversational or fluent 

421 

422 if len(request.language_ability_filter) > 0: 

423 language_options = [] 

424 for ability_filter in request.language_ability_filter: 

425 fluency_sql_value = fluency2sql.get(ability_filter.fluency) 

426 

427 if fluency_sql_value is None: 

428 continue 

429 language_options.append( 

430 and_( 

431 (LanguageAbility.language_code == ability_filter.code), 

432 (LanguageAbility.fluency >= (fluency_sql_value)), 

433 ) 

434 ) 

435 statement = statement.join(LanguageAbility, LanguageAbility.user_id == User.id) 

436 statement = statement.where(or_(*language_options)) 

437 

438 if request.HasField("profile_completed"): 

439 statement = statement.where(User.has_completed_profile == request.profile_completed.value) 

440 if request.HasField("guests"): 

441 statement = statement.where(User.max_guests >= request.guests.value) 

442 if request.HasField("last_minute"): 

443 statement = statement.where(User.last_minute == request.last_minute.value) 

444 if request.HasField("has_pets"): 

445 statement = statement.where(User.has_pets == request.has_pets.value) 

446 if request.HasField("accepts_pets"): 

447 statement = statement.where(User.accepts_pets == request.accepts_pets.value) 

448 if request.HasField("has_kids"): 

449 statement = statement.where(User.has_kids == request.has_kids.value) 

450 if request.HasField("accepts_kids"): 

451 statement = statement.where(User.accepts_kids == request.accepts_kids.value) 

452 if request.HasField("has_housemates"): 

453 statement = statement.where(User.has_housemates == request.has_housemates.value) 

454 if request.HasField("wheelchair_accessible"): 

455 statement = statement.where(User.wheelchair_accessible == request.wheelchair_accessible.value) 

456 if request.HasField("smokes_at_home"): 

457 statement = statement.where(User.smokes_at_home == request.smokes_at_home.value) 

458 if request.HasField("drinking_allowed"): 

459 statement = statement.where(User.drinking_allowed == request.drinking_allowed.value) 

460 if request.HasField("drinks_at_home"): 

461 statement = statement.where(User.drinks_at_home == request.drinks_at_home.value) 

462 if request.HasField("parking"): 

463 statement = statement.where(User.parking == request.parking.value) 

464 if request.HasField("camping_ok"): 

465 statement = statement.where(User.camping_ok == request.camping_ok.value) 

466 

467 if request.HasField("search_in_area"): 

468 # EPSG4326 measures distance in decimal degress 

469 # we want to check whether two circles overlap, so check if the distance between their centers is less 

470 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator) 

471 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng) 

472 statement = statement.where( 

473 func.ST_DWithin( 

474 # old: 

475 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111 

476 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius 

477 User.geom, 

478 search_point, 

479 (1000 + request.search_in_area.radius) / 111111, 

480 ) 

481 ) 

482 if request.HasField("search_in_rectangle"): 

483 statement = statement.where( 

484 func.ST_Within( 

485 User.geom, 

486 func.ST_MakeEnvelope( 

487 request.search_in_rectangle.lng_min, 

488 request.search_in_rectangle.lat_min, 

489 request.search_in_rectangle.lng_max, 

490 request.search_in_rectangle.lat_max, 

491 4326, 

492 ), 

493 ) 

494 ) 

495 if request.HasField("search_in_community_id"): 

496 # could do a join here as well, but this is just simpler 

497 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none() 

498 if not node: 

499 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

500 statement = statement.where(func.ST_Contains(node.geom, User.geom)) 

501 

502 if request.only_with_references: 

503 references = ( 

504 select(Reference.to_user_id.label("user_id")) 

505 .where_users_column_visible(context, Reference.from_user_id) 

506 .distinct() 

507 .subquery() 

508 ) 

509 statement = statement.join(references, references.c.user_id == User.id) 

510 

511 if request.only_with_strong_verification: 

512 statement = statement.join( 

513 StrongVerificationAttempt, 

514 and_( 

515 StrongVerificationAttempt.user_id == User.id, 

516 StrongVerificationAttempt.has_strong_verification(User), 

517 ), 

518 ) 

519 # TODO: 

520 # bool friends_only = 13; 

521 

522 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

523 next_recommendation_score = float(decrypt_page_token(request.page_token)) if request.page_token else 1e10 

524 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar() 

525 

526 statement = ( 

527 statement.where(User.recommendation_score <= next_recommendation_score) 

528 .order_by(User.recommendation_score.desc()) 

529 .limit(page_size + 1) 

530 ) 

531 res = session.execute(statement).all() 

532 if res: 

533 users, rec_scores = zip(*res) 

534 else: 

535 users = [] 

536 next_page_token = encrypt_page_token(str(rec_scores[-1])) if len(users) > page_size else None 

537 return users[:page_size], next_page_token, total_items 

538 

539 

540class Search(search_pb2_grpc.SearchServicer): 

541 def Search(self, request, context, session): 

542 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

543 # this is not an ideal page token, some results have equal rank (unlikely) 

544 next_rank = float(request.page_token) if request.page_token else None 

545 

546 all_results = ( 

547 _search_users( 

548 session, 

549 request.query, 

550 request.title_only, 

551 next_rank, 

552 page_size, 

553 context, 

554 request.include_users, 

555 ) 

556 + _search_pages( 

557 session, 

558 request.query, 

559 request.title_only, 

560 next_rank, 

561 page_size, 

562 context, 

563 request.include_places, 

564 request.include_guides, 

565 ) 

566 + _search_events( 

567 session, 

568 request.query, 

569 request.title_only, 

570 next_rank, 

571 page_size, 

572 context, 

573 ) 

574 + _search_clusters( 

575 session, 

576 request.query, 

577 request.title_only, 

578 next_rank, 

579 page_size, 

580 context, 

581 request.include_communities, 

582 request.include_groups, 

583 ) 

584 ) 

585 all_results.sort(key=lambda result: result.rank, reverse=True) 

586 return search_pb2.SearchRes( 

587 results=all_results[:page_size], 

588 next_page_token=str(all_results[page_size].rank) if len(all_results) > page_size else None, 

589 ) 

590 

591 def UserSearch(self, request, context, session): 

592 user_ids_to_return, next_page_token, total_items = _user_search_inner(request, context, session) 

593 

594 user_ids_to_users = dict(session.execute(select(User.id, User).where(User.id.in_(user_ids_to_return))).all()) 

595 

596 return search_pb2.UserSearchRes( 

597 results=[ 

598 search_pb2.Result( 

599 rank=1, 

600 user=user_model_to_pb(user_ids_to_users[user_id], session, context), 

601 ) 

602 for user_id in user_ids_to_return 

603 ], 

604 next_page_token=next_page_token, 

605 total_items=total_items, 

606 ) 

607 

608 def UserSearchV2(self, request, context, session): 

609 user_ids_to_return, next_page_token, total_items = _user_search_inner(request, context, session) 

610 

611 LiteUser_by_id = { 

612 lite_user.id: lite_user 

613 for lite_user in session.execute(select(LiteUser).where(LiteUser.id.in_(user_ids_to_return))) 

614 .scalars() 

615 .all() 

616 } 

617 

618 response_rate_by_id = { 

619 resp_rate.user_id: resp_rate 

620 for resp_rate in session.execute( 

621 select(UserResponseRate).where(UserResponseRate.user_id.in_(user_ids_to_return)) 

622 ) 

623 .scalars() 

624 .all() 

625 } 

626 

627 db_user_data_by_id = { 

628 user_id: (about_me, gender, last_active, hosting_status, meetup_status, joined) 

629 for user_id, about_me, gender, last_active, hosting_status, meetup_status, joined in session.execute( 

630 select( 

631 User.id, 

632 User.about_me, 

633 User.gender, 

634 User.last_active, 

635 User.hosting_status, 

636 User.meetup_status, 

637 User.joined, 

638 ).where(User.id.in_(user_ids_to_return)) 

639 ).all() 

640 } 

641 

642 ref_counts_by_user_id = get_num_references(session, user_ids_to_return) 

643 

644 def _user_to_search_user(user_id): 

645 lite_user = LiteUser_by_id[user_id] 

646 

647 about_me, gender, last_active, hosting_status, meetup_status, joined = db_user_data_by_id[user_id] 

648 

649 lat, lng = get_coordinates(lite_user.geom) 

650 return search_pb2.SearchUser( 

651 user_id=lite_user.id, 

652 username=lite_user.username, 

653 name=lite_user.name, 

654 city=lite_user.city, 

655 joined=Timestamp_from_datetime(last_active_coarsen(joined)), 

656 has_completed_profile=lite_user.has_completed_profile, 

657 has_completed_my_home=lite_user.has_completed_my_home, 

658 lat=lat, 

659 lng=lng, 

660 profile_snippet=about_me, 

661 num_references=ref_counts_by_user_id.get(lite_user.id, 0), 

662 gender=gender, 

663 age=int(lite_user.age), 

664 last_active=Timestamp_from_datetime(last_active_coarsen(last_active)), 

665 hosting_status=hostingstatus2api[hosting_status], 

666 meetup_status=meetupstatus2api[meetup_status], 

667 avatar_url=urls.media_url(filename=lite_user.avatar_filename, size="full") 

668 if lite_user.avatar_filename 

669 else None, 

670 avatar_thumbnail_url=urls.media_url(filename=lite_user.avatar_filename, size="thumbnail") 

671 if lite_user.avatar_filename 

672 else None, 

673 has_strong_verification=lite_user.has_strong_verification, 

674 **response_rate_to_pb(response_rate_by_id.get(user_id)), 

675 ) 

676 

677 results = reranker([_user_to_search_user(user_id) for user_id in user_ids_to_return]) 

678 

679 return search_pb2.UserSearchV2Res( 

680 results=results, 

681 next_page_token=next_page_token, 

682 total_items=total_items, 

683 ) 

684 

685 def EventSearch(self, request, context, session): 

686 statement = ( 

687 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted) 

688 ) 

689 

690 if request.HasField("query"): 

691 if request.query_title_only: 

692 statement = statement.where(Event.title.ilike(f"%{request.query.value}%")) 

693 else: 

694 statement = statement.where( 

695 or_( 

696 Event.title.ilike(f"%{request.query.value}%"), 

697 EventOccurrence.content.ilike(f"%{request.query.value}%"), 

698 EventOccurrence.address.ilike(f"%{request.query.value}%"), 

699 ) 

700 ) 

701 

702 if request.only_online: 

703 statement = statement.where(EventOccurrence.geom == None) 

704 elif request.only_offline: 

705 statement = statement.where(EventOccurrence.geom != None) 

706 

707 if request.subscribed or request.attending or request.organizing or request.my_communities: 

708 where_ = [] 

709 

710 if request.subscribed: 

711 statement = statement.outerjoin( 

712 EventSubscription, 

713 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id), 

714 ) 

715 where_.append(EventSubscription.user_id != None) 

716 if request.organizing: 

717 statement = statement.outerjoin( 

718 EventOrganizer, 

719 and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id), 

720 ) 

721 where_.append(EventOrganizer.user_id != None) 

722 if request.attending: 

723 statement = statement.outerjoin( 

724 EventOccurrenceAttendee, 

725 and_( 

726 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id, 

727 EventOccurrenceAttendee.user_id == context.user_id, 

728 ), 

729 ) 

730 where_.append(EventOccurrenceAttendee.user_id != None) 

731 if request.my_communities: 

732 my_communities = ( 

733 session.execute( 

734 select(Node.id) 

735 .join(Cluster, Cluster.parent_node_id == Node.id) 

736 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

737 .where(ClusterSubscription.user_id == context.user_id) 

738 .where(Cluster.is_official_cluster) 

739 .order_by(Node.id) 

740 .limit(100000) 

741 ) 

742 .scalars() 

743 .all() 

744 ) 

745 where_.append(Event.parent_node_id.in_(my_communities)) 

746 

747 statement = statement.where(or_(*where_)) 

748 

749 if not request.include_cancelled: 

750 statement = statement.where(~EventOccurrence.is_cancelled) 

751 

752 if request.HasField("search_in_area"): 

753 # EPSG4326 measures distance in decimal degress 

754 # we want to check whether two circles overlap, so check if the distance between their centers is less 

755 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator) 

756 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng) 

757 statement = statement.where( 

758 func.ST_DWithin( 

759 # old: 

760 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111 

761 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius 

762 EventOccurrence.geom, 

763 search_point, 

764 (1000 + request.search_in_area.radius) / 111111, 

765 ) 

766 ) 

767 if request.HasField("search_in_rectangle"): 

768 statement = statement.where( 

769 func.ST_Within( 

770 EventOccurrence.geom, 

771 func.ST_MakeEnvelope( 

772 request.search_in_rectangle.lng_min, 

773 request.search_in_rectangle.lat_min, 

774 request.search_in_rectangle.lng_max, 

775 request.search_in_rectangle.lat_max, 

776 4326, 

777 ), 

778 ) 

779 ) 

780 if request.HasField("search_in_community_id"): 

781 # could do a join here as well, but this is just simpler 

782 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none() 

783 if not node: 

784 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

785 statement = statement.where(func.ST_Contains(node.geom, EventOccurrence.geom)) 

786 

787 if request.HasField("after"): 

788 statement = statement.where(EventOccurrence.start_time > to_aware_datetime(request.after)) 

789 if request.HasField("before"): 

790 statement = statement.where(EventOccurrence.end_time < to_aware_datetime(request.before)) 

791 

792 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

793 # the page token is a unix timestamp of where we left off 

794 page_token = ( 

795 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now() 

796 ) 

797 page_number = request.page_number or 1 

798 # Calculate the offset for pagination 

799 offset = (page_number - 1) * page_size 

800 

801 if not request.past: 

802 statement = statement.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

803 EventOccurrence.start_time.asc() 

804 ) 

805 else: 

806 statement = statement.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

807 EventOccurrence.start_time.desc() 

808 ) 

809 

810 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar() 

811 # Apply pagination by page number 

812 statement = statement.offset(offset).limit(page_size) if request.page_number else statement.limit(page_size + 1) 

813 occurrences = session.execute(statement).scalars().all() 

814 

815 return search_pb2.EventSearchRes( 

816 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

817 next_page_token=(str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None), 

818 total_items=total_items, 

819 )