Coverage for src/couchers/servicers/search.py: 85%

269 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-07-03 04:21 +0000

1""" 

2See //docs/search.md for overview. 

3""" 

4 

5from datetime import timedelta 

6 

7import grpc 

8from sqlalchemy.sql import and_, func, or_ 

9 

10from couchers import errors, urls 

11from couchers.crypto import decrypt_page_token, encrypt_page_token 

12from couchers.materialized_views import lite_users, user_response_rates 

13from couchers.models import ( 

14 Cluster, 

15 ClusterSubscription, 

16 Event, 

17 EventOccurrence, 

18 EventOccurrenceAttendee, 

19 EventOrganizer, 

20 EventSubscription, 

21 LanguageAbility, 

22 Node, 

23 Page, 

24 PageType, 

25 PageVersion, 

26 Reference, 

27 StrongVerificationAttempt, 

28 User, 

29) 

30from couchers.servicers.account import has_strong_verification 

31from couchers.servicers.api import ( 

32 fluency2sql, 

33 get_num_references, 

34 hostingstatus2api, 

35 hostingstatus2sql, 

36 meetupstatus2api, 

37 meetupstatus2sql, 

38 parkingdetails2sql, 

39 response_rate_to_pb, 

40 sleepingarrangement2sql, 

41 smokinglocation2sql, 

42 user_model_to_pb, 

43) 

44from couchers.servicers.communities import community_to_pb 

45from couchers.servicers.events import event_to_pb 

46from couchers.servicers.groups import group_to_pb 

47from couchers.servicers.pages import page_to_pb 

48from couchers.sql import couchers_select as select 

49from couchers.utils import ( 

50 Timestamp_from_datetime, 

51 create_coordinate, 

52 dt_from_millis, 

53 get_coordinates, 

54 last_active_coarsen, 

55 millis_from_dt, 

56 now, 

57 to_aware_datetime, 

58) 

59from proto import search_pb2, search_pb2_grpc 

60 

61# searches are a bit expensive, we'd rather send back a bunch of results at once than lots of small pages 

62MAX_PAGINATION_LENGTH = 100 

63 

64REGCONFIG = "english" 

65TRI_SIMILARITY_THRESHOLD = 0.6 

66TRI_SIMILARITY_WEIGHT = 5 

67 

68 

69def _join_with_space(coalesces): 

70 # the objects in coalesces are not strings, so we can't do " ".join(coalesces). They're SQLAlchemy magic. 

71 if not coalesces: 

72 return "" 

73 out = coalesces[0] 

74 for coalesce in coalesces[1:]: 

75 out += " " + coalesce 

76 return out 

77 

78 

79def _build_tsv(A, B=None, C=None, D=None): 

80 """ 

81 Given lists for A, B, C, and D, builds a tsvector from them. 

82 """ 

83 B = B or [] 

84 C = C or [] 

85 D = D or [] 

86 tsv = func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in A])), "A") 

87 if B: 

88 tsv = tsv.concat( 

89 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in B])), "B") 

90 ) 

91 if C: 

92 tsv = tsv.concat( 

93 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in C])), "C") 

94 ) 

95 if D: 

96 tsv = tsv.concat( 

97 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in D])), "D") 

98 ) 

99 return tsv 

100 

101 

102def _build_doc(A, B=None, C=None, D=None): 

103 """ 

104 Builds the raw document (without to_tsvector and weighting), used for extracting snippet 

105 """ 

106 B = B or [] 

107 C = C or [] 

108 D = D or [] 

109 doc = _join_with_space([func.coalesce(bit, "") for bit in A]) 

110 if B: 

111 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in B]) 

112 if C: 

113 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in C]) 

114 if D: 

115 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in D]) 

116 return doc 

117 

118 

119def _similarity(statement, text): 

120 return func.word_similarity(func.unaccent(statement), func.unaccent(text)) 

121 

122 

123def _gen_search_elements(statement, title_only, next_rank, page_size, A, B=None, C=None, D=None): 

124 """ 

125 Given an sql statement and four sets of fields, (A, B, C, D), generates a bunch of postgres expressions for full text search. 

126 

127 The four sets are in decreasing order of "importance" for ranking. 

128 

129 A should be the "title", the others can be anything. 

130 

131 If title_only=True, we only perform a trigram search against A only 

132 """ 

133 B = B or [] 

134 C = C or [] 

135 D = D or [] 

136 if not title_only: 

137 # a postgres tsquery object that can be used to match against a tsvector 

138 tsq = func.websearch_to_tsquery(REGCONFIG, statement) 

139 

140 # the tsvector object that we want to search against with our tsquery 

141 tsv = _build_tsv(A, B, C, D) 

142 

143 # document to generate snippet from 

144 doc = _build_doc(A, B, C, D) 

145 

146 title = _build_doc(A) 

147 

148 # trigram based text similarity between title and sql statement string 

149 sim = _similarity(statement, title) 

150 

151 # ranking algo, weigh the similarity a lot, the text-based ranking less 

152 rank = (TRI_SIMILARITY_WEIGHT * sim + func.ts_rank_cd(tsv, tsq)).label("rank") 

153 

154 # the snippet with results highlighted 

155 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet") 

156 

157 def execute_search_statement(session, orig_statement): 

158 """ 

159 Does the right search filtering, limiting, and ordering for the initial statement 

160 """ 

161 return session.execute( 

162 orig_statement.where(or_(tsv.op("@@")(tsq), sim > TRI_SIMILARITY_THRESHOLD)) 

163 .where(rank <= next_rank if next_rank is not None else True) 

164 .order_by(rank.desc()) 

165 .limit(page_size + 1) 

166 ).all() 

167 

168 else: 

169 title = _build_doc(A) 

170 

171 # trigram based text similarity between title and sql statement string 

172 sim = _similarity(statement, title) 

173 

174 # ranking algo, weigh the similarity a lot, the text-based ranking less 

175 rank = sim.label("rank") 

176 

177 # used only for headline 

178 tsq = func.websearch_to_tsquery(REGCONFIG, statement) 

179 doc = _build_doc(A, B, C, D) 

180 

181 # the snippet with results highlighted 

182 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet") 

183 

184 def execute_search_statement(session, orig_statement): 

185 """ 

186 Does the right search filtering, limiting, and ordering for the initial statement 

187 """ 

188 return session.execute( 

189 orig_statement.where(sim > TRI_SIMILARITY_THRESHOLD) 

190 .where(rank <= next_rank if next_rank is not None else True) 

191 .order_by(rank.desc()) 

192 .limit(page_size + 1) 

193 ).all() 

194 

195 return rank, snippet, execute_search_statement 

196 

197 

198def _search_users(session, search_statement, title_only, next_rank, page_size, context, include_users): 

199 if not include_users: 

200 return [] 

201 rank, snippet, execute_search_statement = _gen_search_elements( 

202 search_statement, 

203 title_only, 

204 next_rank, 

205 page_size, 

206 [User.username, User.name], 

207 [User.city], 

208 [User.about_me], 

209 [User.things_i_like, User.about_place, User.additional_information], 

210 ) 

211 

212 users = execute_search_statement(session, select(User, rank, snippet).where_users_visible(context)) 

213 

214 return [ 

215 search_pb2.Result( 

216 rank=rank, 

217 user=user_model_to_pb(page, session, context), 

218 snippet=snippet, 

219 ) 

220 for page, rank, snippet in users 

221 ] 

222 

223 

224def _search_pages(session, search_statement, title_only, next_rank, page_size, context, include_places, include_guides): 

225 rank, snippet, execute_search_statement = _gen_search_elements( 

226 search_statement, 

227 title_only, 

228 next_rank, 

229 page_size, 

230 [PageVersion.title], 

231 [PageVersion.address], 

232 [], 

233 [PageVersion.content], 

234 ) 

235 if not include_places and not include_guides: 

236 return [] 

237 

238 latest_pages = ( 

239 select(func.max(PageVersion.id).label("id")) 

240 .join(Page, Page.id == PageVersion.page_id) 

241 .where( 

242 or_( 

243 (Page.type == PageType.place) if include_places else False, 

244 (Page.type == PageType.guide) if include_guides else False, 

245 ) 

246 ) 

247 .group_by(PageVersion.page_id) 

248 .subquery() 

249 ) 

250 

251 pages = execute_search_statement( 

252 session, 

253 select(Page, rank, snippet) 

254 .join(PageVersion, PageVersion.page_id == Page.id) 

255 .join(latest_pages, latest_pages.c.id == PageVersion.id), 

256 ) 

257 

258 return [ 

259 search_pb2.Result( 

260 rank=rank, 

261 place=page_to_pb(session, page, context) if page.type == PageType.place else None, 

262 guide=page_to_pb(session, page, context) if page.type == PageType.guide else None, 

263 snippet=snippet, 

264 ) 

265 for page, rank, snippet in pages 

266 ] 

267 

268 

269def _search_events(session, search_statement, title_only, next_rank, page_size, context): 

270 rank, snippet, execute_search_statement = _gen_search_elements( 

271 search_statement, 

272 title_only, 

273 next_rank, 

274 page_size, 

275 [Event.title], 

276 [EventOccurrence.address, EventOccurrence.link], 

277 [], 

278 [EventOccurrence.content], 

279 ) 

280 

281 occurrences = execute_search_statement( 

282 session, 

283 select(EventOccurrence, rank, snippet) 

284 .join(Event, Event.id == EventOccurrence.event_id) 

285 .where(EventOccurrence.end_time >= func.now()), 

286 ) 

287 

288 return [ 

289 search_pb2.Result( 

290 rank=rank, 

291 event=event_to_pb(session, occurrence, context), 

292 snippet=snippet, 

293 ) 

294 for occurrence, rank, snippet in occurrences 

295 ] 

296 

297 

298def _search_clusters( 

299 session, search_statement, title_only, next_rank, page_size, context, include_communities, include_groups 

300): 

301 if not include_communities and not include_groups: 

302 return [] 

303 

304 rank, snippet, execute_search_statement = _gen_search_elements( 

305 search_statement, 

306 title_only, 

307 next_rank, 

308 page_size, 

309 [Cluster.name], 

310 [PageVersion.address, PageVersion.title], 

311 [Cluster.description], 

312 [PageVersion.content], 

313 ) 

314 

315 latest_pages = ( 

316 select(func.max(PageVersion.id).label("id")) 

317 .join(Page, Page.id == PageVersion.page_id) 

318 .where(Page.type == PageType.main_page) 

319 .group_by(PageVersion.page_id) 

320 .subquery() 

321 ) 

322 

323 clusters = execute_search_statement( 

324 session, 

325 select(Cluster, rank, snippet) 

326 .join(Page, Page.owner_cluster_id == Cluster.id) 

327 .join(PageVersion, PageVersion.page_id == Page.id) 

328 .join(latest_pages, latest_pages.c.id == PageVersion.id) 

329 .where(Cluster.is_official_cluster if include_communities and not include_groups else True) 

330 .where(~Cluster.is_official_cluster if not include_communities and include_groups else True), 

331 ) 

332 

333 return [ 

334 search_pb2.Result( 

335 rank=rank, 

336 community=( 

337 community_to_pb(session, cluster.official_cluster_for_node, context) 

338 if cluster.is_official_cluster 

339 else None 

340 ), 

341 group=group_to_pb(session, cluster, context) if not cluster.is_official_cluster else None, 

342 snippet=snippet, 

343 ) 

344 for cluster, rank, snippet in clusters 

345 ] 

346 

347 

348def _user_search_inner(request, context, session): 

349 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

350 

351 # Base statement with visibility filter 

352 statement = select(User.id, User.recommendation_score).where_users_visible(context) 

353 # make sure that only users who are in lite_users show up 

354 statement = statement.join(lite_users, lite_users.c.id == User.id) 

355 

356 # If exactly_user_ids is present, only filter by those IDs and ignore all other filters 

357 # This is a bit of a hacky feature to help with the frontend map implementation 

358 if len(request.exactly_user_ids) > 0: 

359 statement = statement.where(User.id.in_(request.exactly_user_ids)) 

360 else: 

361 # Apply all the normal filters 

362 if request.HasField("query"): 

363 if request.query_name_only: 

364 statement = statement.where( 

365 or_(User.name.ilike(f"%{request.query.value}%"), User.username.ilike(f"%{request.query.value}%")) 

366 ) 

367 else: 

368 statement = statement.where( 

369 or_( 

370 User.name.ilike(f"%{request.query.value}%"), 

371 User.username.ilike(f"%{request.query.value}%"), 

372 User.city.ilike(f"%{request.query.value}%"), 

373 User.hometown.ilike(f"%{request.query.value}%"), 

374 User.about_me.ilike(f"%{request.query.value}%"), 

375 User.things_i_like.ilike(f"%{request.query.value}%"), 

376 User.about_place.ilike(f"%{request.query.value}%"), 

377 User.additional_information.ilike(f"%{request.query.value}%"), 

378 ) 

379 ) 

380 

381 if request.HasField("last_active"): 

382 raw_dt = to_aware_datetime(request.last_active) 

383 statement = statement.where(User.last_active >= last_active_coarsen(raw_dt)) 

384 

385 if len(request.gender) > 0: 

386 if not has_strong_verification(session, user): 

387 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NEED_STRONG_VERIFICATION) 

388 elif user.gender not in request.gender: 

389 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_INCLUDE_OWN_GENDER) 

390 else: 

391 statement = statement.where(User.gender.in_(request.gender)) 

392 

393 if len(request.hosting_status_filter) > 0: 

394 statement = statement.where( 

395 User.hosting_status.in_([hostingstatus2sql[status] for status in request.hosting_status_filter]) 

396 ) 

397 if len(request.meetup_status_filter) > 0: 

398 statement = statement.where( 

399 User.meetup_status.in_([meetupstatus2sql[status] for status in request.meetup_status_filter]) 

400 ) 

401 if len(request.smoking_location_filter) > 0: 

402 statement = statement.where( 

403 User.smoking_allowed.in_([smokinglocation2sql[loc] for loc in request.smoking_location_filter]) 

404 ) 

405 if len(request.sleeping_arrangement_filter) > 0: 

406 statement = statement.where( 

407 User.sleeping_arrangement.in_( 

408 [sleepingarrangement2sql[arr] for arr in request.sleeping_arrangement_filter] 

409 ) 

410 ) 

411 if len(request.parking_details_filter) > 0: 

412 statement = statement.where( 

413 User.parking_details.in_([parkingdetails2sql[det] for det in request.parking_details_filter]) 

414 ) 

415 # limits/default could be handled on the front end as well 

416 min_age = request.age_min.value if request.HasField("age_min") else 18 

417 max_age = request.age_max.value if request.HasField("age_max") else 200 

418 

419 statement = statement.where((User.age >= min_age) & (User.age <= max_age)) 

420 

421 # return results with by language code as only input 

422 # fluency in conversational or fluent 

423 

424 if len(request.language_ability_filter) > 0: 

425 language_options = [] 

426 for ability_filter in request.language_ability_filter: 

427 fluency_sql_value = fluency2sql.get(ability_filter.fluency) 

428 

429 if fluency_sql_value is None: 

430 continue 

431 language_options.append( 

432 and_( 

433 (LanguageAbility.language_code == ability_filter.code), 

434 (LanguageAbility.fluency >= (fluency_sql_value)), 

435 ) 

436 ) 

437 statement = statement.join(LanguageAbility, LanguageAbility.user_id == User.id) 

438 statement = statement.where(or_(*language_options)) 

439 

440 if request.HasField("profile_completed"): 

441 statement = statement.where(User.has_completed_profile == request.profile_completed.value) 

442 if request.HasField("guests"): 

443 statement = statement.where(User.max_guests >= request.guests.value) 

444 if request.HasField("last_minute"): 

445 statement = statement.where(User.last_minute == request.last_minute.value) 

446 if request.HasField("has_pets"): 

447 statement = statement.where(User.has_pets == request.has_pets.value) 

448 if request.HasField("accepts_pets"): 

449 statement = statement.where(User.accepts_pets == request.accepts_pets.value) 

450 if request.HasField("has_kids"): 

451 statement = statement.where(User.has_kids == request.has_kids.value) 

452 if request.HasField("accepts_kids"): 

453 statement = statement.where(User.accepts_kids == request.accepts_kids.value) 

454 if request.HasField("has_housemates"): 

455 statement = statement.where(User.has_housemates == request.has_housemates.value) 

456 if request.HasField("wheelchair_accessible"): 

457 statement = statement.where(User.wheelchair_accessible == request.wheelchair_accessible.value) 

458 if request.HasField("smokes_at_home"): 

459 statement = statement.where(User.smokes_at_home == request.smokes_at_home.value) 

460 if request.HasField("drinking_allowed"): 

461 statement = statement.where(User.drinking_allowed == request.drinking_allowed.value) 

462 if request.HasField("drinks_at_home"): 

463 statement = statement.where(User.drinks_at_home == request.drinks_at_home.value) 

464 if request.HasField("parking"): 

465 statement = statement.where(User.parking == request.parking.value) 

466 if request.HasField("camping_ok"): 

467 statement = statement.where(User.camping_ok == request.camping_ok.value) 

468 

469 if request.HasField("search_in_area"): 

470 # EPSG4326 measures distance in decimal degress 

471 # we want to check whether two circles overlap, so check if the distance between their centers is less 

472 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator) 

473 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng) 

474 statement = statement.where( 

475 func.ST_DWithin( 

476 # old: 

477 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111 

478 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius 

479 User.geom, 

480 search_point, 

481 (1000 + request.search_in_area.radius) / 111111, 

482 ) 

483 ) 

484 if request.HasField("search_in_rectangle"): 

485 statement = statement.where( 

486 func.ST_Within( 

487 User.geom, 

488 func.ST_MakeEnvelope( 

489 request.search_in_rectangle.lng_min, 

490 request.search_in_rectangle.lat_min, 

491 request.search_in_rectangle.lng_max, 

492 request.search_in_rectangle.lat_max, 

493 4326, 

494 ), 

495 ) 

496 ) 

497 if request.HasField("search_in_community_id"): 

498 # could do a join here as well, but this is just simpler 

499 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none() 

500 if not node: 

501 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

502 statement = statement.where(func.ST_Contains(node.geom, User.geom)) 

503 

504 if request.only_with_references: 

505 references = ( 

506 select(Reference.to_user_id.label("user_id")) 

507 .where_users_column_visible(context, Reference.from_user_id) 

508 .distinct() 

509 .subquery() 

510 ) 

511 statement = statement.join(references, references.c.user_id == User.id) 

512 

513 if request.only_with_strong_verification: 

514 statement = statement.join( 

515 StrongVerificationAttempt, 

516 and_( 

517 StrongVerificationAttempt.user_id == User.id, 

518 StrongVerificationAttempt.has_strong_verification(User), 

519 ), 

520 ) 

521 # TODO: 

522 # bool friends_only = 13; 

523 

524 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

525 next_recommendation_score = float(decrypt_page_token(request.page_token)) if request.page_token else 1e10 

526 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar() 

527 

528 statement = ( 

529 statement.where(User.recommendation_score <= next_recommendation_score) 

530 .order_by(User.recommendation_score.desc()) 

531 .limit(page_size + 1) 

532 ) 

533 res = session.execute(statement).all() 

534 if res: 

535 users, rec_scores = zip(*res) 

536 else: 

537 users = [] 

538 next_page_token = encrypt_page_token(str(rec_scores[-1])) if len(users) > page_size else None 

539 return users[:page_size], next_page_token, total_items 

540 

541 

542class Search(search_pb2_grpc.SearchServicer): 

543 def Search(self, request, context, session): 

544 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

545 # this is not an ideal page token, some results have equal rank (unlikely) 

546 next_rank = float(request.page_token) if request.page_token else None 

547 

548 all_results = ( 

549 _search_users( 

550 session, 

551 request.query, 

552 request.title_only, 

553 next_rank, 

554 page_size, 

555 context, 

556 request.include_users, 

557 ) 

558 + _search_pages( 

559 session, 

560 request.query, 

561 request.title_only, 

562 next_rank, 

563 page_size, 

564 context, 

565 request.include_places, 

566 request.include_guides, 

567 ) 

568 + _search_events( 

569 session, 

570 request.query, 

571 request.title_only, 

572 next_rank, 

573 page_size, 

574 context, 

575 ) 

576 + _search_clusters( 

577 session, 

578 request.query, 

579 request.title_only, 

580 next_rank, 

581 page_size, 

582 context, 

583 request.include_communities, 

584 request.include_groups, 

585 ) 

586 ) 

587 all_results.sort(key=lambda result: result.rank, reverse=True) 

588 return search_pb2.SearchRes( 

589 results=all_results[:page_size], 

590 next_page_token=str(all_results[page_size].rank) if len(all_results) > page_size else None, 

591 ) 

592 

593 def UserSearch(self, request, context, session): 

594 user_ids_to_return, next_page_token, total_items = _user_search_inner(request, context, session) 

595 

596 user_ids_to_users = dict(session.execute(select(User.id, User).where(User.id.in_(user_ids_to_return))).all()) 

597 

598 return search_pb2.UserSearchRes( 

599 results=[ 

600 search_pb2.Result( 

601 rank=1, 

602 user=user_model_to_pb(user_ids_to_users[user_id], session, context), 

603 ) 

604 for user_id in user_ids_to_return 

605 ], 

606 next_page_token=next_page_token, 

607 total_items=total_items, 

608 ) 

609 

610 def UserSearchV2(self, request, context, session): 

611 user_ids_to_return, next_page_token, total_items = _user_search_inner(request, context, session) 

612 

613 lite_users_by_id = { 

614 lite_user.id: lite_user 

615 for lite_user in session.execute(select(lite_users).where(lite_users.c.id.in_(user_ids_to_return))).all() 

616 } 

617 

618 response_rates_by_id = { 

619 resp_rate.user_id: resp_rate 

620 for resp_rate in session.execute( 

621 select(user_response_rates).where(user_response_rates.c.user_id.in_(user_ids_to_return)) 

622 ).all() 

623 } 

624 

625 db_user_data_by_id = { 

626 user_id: (about_me, gender, last_active, hosting_status, meetup_status) 

627 for user_id, about_me, gender, last_active, hosting_status, meetup_status in session.execute( 

628 select( 

629 User.id, User.about_me, User.gender, User.last_active, User.hosting_status, User.meetup_status 

630 ).where(User.id.in_(user_ids_to_return)) 

631 ).all() 

632 } 

633 

634 ref_counts_by_user_id = get_num_references(session, user_ids_to_return) 

635 

636 def _user_to_search_user(user_id): 

637 lite_user = lite_users_by_id[user_id] 

638 

639 about_me, gender, last_active, hosting_status, meetup_status = db_user_data_by_id[user_id] 

640 

641 lat, lng = get_coordinates(lite_user.geom) 

642 return search_pb2.SearchUser( 

643 user_id=lite_user.id, 

644 username=lite_user.username, 

645 name=lite_user.name, 

646 city=lite_user.city, 

647 has_completed_profile=lite_user.has_completed_profile, 

648 lat=lat, 

649 lng=lng, 

650 profile_snippet=about_me, 

651 num_references=ref_counts_by_user_id.get(lite_user.id, 0), 

652 gender=gender, 

653 age=int(lite_user.age), 

654 last_active=Timestamp_from_datetime(last_active_coarsen(last_active)), 

655 hosting_status=hostingstatus2api[hosting_status], 

656 meetup_status=meetupstatus2api[meetup_status], 

657 avatar_url=urls.media_url(filename=lite_user.avatar_filename, size="full") 

658 if lite_user.avatar_filename 

659 else None, 

660 avatar_thumbnail_url=urls.media_url(filename=lite_user.avatar_filename, size="thumbnail") 

661 if lite_user.avatar_filename 

662 else None, 

663 has_strong_verification=lite_user.has_strong_verification, 

664 **response_rate_to_pb(response_rates_by_id.get(user_id)), 

665 ) 

666 

667 return search_pb2.UserSearchV2Res( 

668 results=[_user_to_search_user(user_id) for user_id in user_ids_to_return], 

669 next_page_token=next_page_token, 

670 total_items=total_items, 

671 ) 

672 

673 def EventSearch(self, request, context, session): 

674 statement = ( 

675 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted) 

676 ) 

677 

678 if request.HasField("query"): 

679 if request.query_title_only: 

680 statement = statement.where(Event.title.ilike(f"%{request.query.value}%")) 

681 else: 

682 statement = statement.where( 

683 or_( 

684 Event.title.ilike(f"%{request.query.value}%"), 

685 EventOccurrence.content.ilike(f"%{request.query.value}%"), 

686 EventOccurrence.address.ilike(f"%{request.query.value}%"), 

687 ) 

688 ) 

689 

690 if request.only_online: 

691 statement = statement.where(EventOccurrence.geom == None) 

692 elif request.only_offline: 

693 statement = statement.where(EventOccurrence.geom != None) 

694 

695 if request.subscribed or request.attending or request.organizing or request.my_communities: 

696 where_ = [] 

697 

698 if request.subscribed: 

699 statement = statement.outerjoin( 

700 EventSubscription, 

701 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id), 

702 ) 

703 where_.append(EventSubscription.user_id != None) 

704 if request.organizing: 

705 statement = statement.outerjoin( 

706 EventOrganizer, 

707 and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id), 

708 ) 

709 where_.append(EventOrganizer.user_id != None) 

710 if request.attending: 

711 statement = statement.outerjoin( 

712 EventOccurrenceAttendee, 

713 and_( 

714 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id, 

715 EventOccurrenceAttendee.user_id == context.user_id, 

716 ), 

717 ) 

718 where_.append(EventOccurrenceAttendee.user_id != None) 

719 if request.my_communities: 

720 my_communities = ( 

721 session.execute( 

722 select(Node.id) 

723 .join(Cluster, Cluster.parent_node_id == Node.id) 

724 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

725 .where(ClusterSubscription.user_id == context.user_id) 

726 .where(Cluster.is_official_cluster) 

727 .order_by(Node.id) 

728 .limit(100000) 

729 ) 

730 .scalars() 

731 .all() 

732 ) 

733 where_.append(Event.parent_node_id.in_(my_communities)) 

734 

735 statement = statement.where(or_(*where_)) 

736 

737 if not request.include_cancelled: 

738 statement = statement.where(~EventOccurrence.is_cancelled) 

739 

740 if request.HasField("search_in_area"): 

741 # EPSG4326 measures distance in decimal degress 

742 # we want to check whether two circles overlap, so check if the distance between their centers is less 

743 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator) 

744 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng) 

745 statement = statement.where( 

746 func.ST_DWithin( 

747 # old: 

748 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111 

749 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius 

750 EventOccurrence.geom, 

751 search_point, 

752 (1000 + request.search_in_area.radius) / 111111, 

753 ) 

754 ) 

755 if request.HasField("search_in_rectangle"): 

756 statement = statement.where( 

757 func.ST_Within( 

758 EventOccurrence.geom, 

759 func.ST_MakeEnvelope( 

760 request.search_in_rectangle.lng_min, 

761 request.search_in_rectangle.lat_min, 

762 request.search_in_rectangle.lng_max, 

763 request.search_in_rectangle.lat_max, 

764 4326, 

765 ), 

766 ) 

767 ) 

768 if request.HasField("search_in_community_id"): 

769 # could do a join here as well, but this is just simpler 

770 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none() 

771 if not node: 

772 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

773 statement = statement.where(func.ST_Contains(node.geom, EventOccurrence.geom)) 

774 

775 if request.HasField("after"): 

776 statement = statement.where(EventOccurrence.start_time > to_aware_datetime(request.after)) 

777 if request.HasField("before"): 

778 statement = statement.where(EventOccurrence.end_time < to_aware_datetime(request.before)) 

779 

780 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

781 # the page token is a unix timestamp of where we left off 

782 page_token = ( 

783 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now() 

784 ) 

785 page_number = request.page_number or 1 

786 # Calculate the offset for pagination 

787 offset = (page_number - 1) * page_size 

788 

789 if not request.past: 

790 statement = statement.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

791 EventOccurrence.start_time.asc() 

792 ) 

793 else: 

794 statement = statement.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

795 EventOccurrence.start_time.desc() 

796 ) 

797 

798 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar() 

799 # Apply pagination by page number 

800 statement = statement.offset(offset).limit(page_size) if request.page_number else statement.limit(page_size + 1) 

801 occurrences = session.execute(statement).scalars().all() 

802 

803 return search_pb2.EventSearchRes( 

804 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

805 next_page_token=(str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None), 

806 total_items=total_items, 

807 )