Coverage for src/couchers/servicers/search.py: 85%

268 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-01 15:07 +0000

1""" 

2See //docs/search.md for overview. 

3""" 

4 

5from datetime import timedelta 

6 

7import grpc 

8from sqlalchemy.sql import and_, func, or_ 

9 

10from couchers import errors, urls 

11from couchers.crypto import decrypt_page_token, encrypt_page_token 

12from couchers.materialized_views import lite_users, user_response_rates 

13from couchers.models import ( 

14 Cluster, 

15 ClusterSubscription, 

16 Event, 

17 EventOccurrence, 

18 EventOccurrenceAttendee, 

19 EventOrganizer, 

20 EventSubscription, 

21 LanguageAbility, 

22 Node, 

23 Page, 

24 PageType, 

25 PageVersion, 

26 Reference, 

27 StrongVerificationAttempt, 

28 User, 

29) 

30from couchers.servicers.account import has_strong_verification 

31from couchers.servicers.api import ( 

32 fluency2sql, 

33 get_num_references, 

34 hostingstatus2api, 

35 hostingstatus2sql, 

36 meetupstatus2api, 

37 meetupstatus2sql, 

38 parkingdetails2sql, 

39 response_rate_to_pb, 

40 sleepingarrangement2sql, 

41 smokinglocation2sql, 

42 user_model_to_pb, 

43) 

44from couchers.servicers.communities import community_to_pb 

45from couchers.servicers.events import event_to_pb 

46from couchers.servicers.groups import group_to_pb 

47from couchers.servicers.pages import page_to_pb 

48from couchers.sql import couchers_select as select 

49from couchers.utils import ( 

50 Timestamp_from_datetime, 

51 create_coordinate, 

52 dt_from_millis, 

53 get_coordinates, 

54 last_active_coarsen, 

55 millis_from_dt, 

56 now, 

57 to_aware_datetime, 

58) 

59from proto import search_pb2, search_pb2_grpc 

60 

61# searches are a bit expensive, we'd rather send back a bunch of results at once than lots of small pages 

62MAX_PAGINATION_LENGTH = 100 

63 

64REGCONFIG = "english" 

65TRI_SIMILARITY_THRESHOLD = 0.6 

66TRI_SIMILARITY_WEIGHT = 5 

67 

68 

69def _join_with_space(coalesces): 

70 # the objects in coalesces are not strings, so we can't do " ".join(coalesces). They're SQLAlchemy magic. 

71 if not coalesces: 

72 return "" 

73 out = coalesces[0] 

74 for coalesce in coalesces[1:]: 

75 out += " " + coalesce 

76 return out 

77 

78 

79def _build_tsv(A, B=None, C=None, D=None): 

80 """ 

81 Given lists for A, B, C, and D, builds a tsvector from them. 

82 """ 

83 B = B or [] 

84 C = C or [] 

85 D = D or [] 

86 tsv = func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in A])), "A") 

87 if B: 

88 tsv = tsv.concat( 

89 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in B])), "B") 

90 ) 

91 if C: 

92 tsv = tsv.concat( 

93 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in C])), "C") 

94 ) 

95 if D: 

96 tsv = tsv.concat( 

97 func.setweight(func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in D])), "D") 

98 ) 

99 return tsv 

100 

101 

102def _build_doc(A, B=None, C=None, D=None): 

103 """ 

104 Builds the raw document (without to_tsvector and weighting), used for extracting snippet 

105 """ 

106 B = B or [] 

107 C = C or [] 

108 D = D or [] 

109 doc = _join_with_space([func.coalesce(bit, "") for bit in A]) 

110 if B: 

111 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in B]) 

112 if C: 

113 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in C]) 

114 if D: 

115 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in D]) 

116 return doc 

117 

118 

119def _similarity(statement, text): 

120 return func.word_similarity(func.unaccent(statement), func.unaccent(text)) 

121 

122 

123def _gen_search_elements(statement, title_only, next_rank, page_size, A, B=None, C=None, D=None): 

124 """ 

125 Given an sql statement and four sets of fields, (A, B, C, D), generates a bunch of postgres expressions for full text search. 

126 

127 The four sets are in decreasing order of "importance" for ranking. 

128 

129 A should be the "title", the others can be anything. 

130 

131 If title_only=True, we only perform a trigram search against A only 

132 """ 

133 B = B or [] 

134 C = C or [] 

135 D = D or [] 

136 if not title_only: 

137 # a postgres tsquery object that can be used to match against a tsvector 

138 tsq = func.websearch_to_tsquery(REGCONFIG, statement) 

139 

140 # the tsvector object that we want to search against with our tsquery 

141 tsv = _build_tsv(A, B, C, D) 

142 

143 # document to generate snippet from 

144 doc = _build_doc(A, B, C, D) 

145 

146 title = _build_doc(A) 

147 

148 # trigram based text similarity between title and sql statement string 

149 sim = _similarity(statement, title) 

150 

151 # ranking algo, weigh the similarity a lot, the text-based ranking less 

152 rank = (TRI_SIMILARITY_WEIGHT * sim + func.ts_rank_cd(tsv, tsq)).label("rank") 

153 

154 # the snippet with results highlighted 

155 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet") 

156 

157 def execute_search_statement(session, orig_statement): 

158 """ 

159 Does the right search filtering, limiting, and ordering for the initial statement 

160 """ 

161 return session.execute( 

162 orig_statement.where(or_(tsv.op("@@")(tsq), sim > TRI_SIMILARITY_THRESHOLD)) 

163 .where(rank <= next_rank if next_rank is not None else True) 

164 .order_by(rank.desc()) 

165 .limit(page_size + 1) 

166 ).all() 

167 

168 else: 

169 title = _build_doc(A) 

170 

171 # trigram based text similarity between title and sql statement string 

172 sim = _similarity(statement, title) 

173 

174 # ranking algo, weigh the similarity a lot, the text-based ranking less 

175 rank = sim.label("rank") 

176 

177 # used only for headline 

178 tsq = func.websearch_to_tsquery(REGCONFIG, statement) 

179 doc = _build_doc(A, B, C, D) 

180 

181 # the snippet with results highlighted 

182 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet") 

183 

184 def execute_search_statement(session, orig_statement): 

185 """ 

186 Does the right search filtering, limiting, and ordering for the initial statement 

187 """ 

188 return session.execute( 

189 orig_statement.where(sim > TRI_SIMILARITY_THRESHOLD) 

190 .where(rank <= next_rank if next_rank is not None else True) 

191 .order_by(rank.desc()) 

192 .limit(page_size + 1) 

193 ).all() 

194 

195 return rank, snippet, execute_search_statement 

196 

197 

198def _search_users(session, search_statement, title_only, next_rank, page_size, context, include_users): 

199 if not include_users: 

200 return [] 

201 rank, snippet, execute_search_statement = _gen_search_elements( 

202 search_statement, 

203 title_only, 

204 next_rank, 

205 page_size, 

206 [User.username, User.name], 

207 [User.city], 

208 [User.about_me], 

209 [User.things_i_like, User.about_place, User.additional_information], 

210 ) 

211 

212 users = execute_search_statement(session, select(User, rank, snippet).where_users_visible(context)) 

213 

214 return [ 

215 search_pb2.Result( 

216 rank=rank, 

217 user=user_model_to_pb(page, session, context), 

218 snippet=snippet, 

219 ) 

220 for page, rank, snippet in users 

221 ] 

222 

223 

224def _search_pages(session, search_statement, title_only, next_rank, page_size, context, include_places, include_guides): 

225 rank, snippet, execute_search_statement = _gen_search_elements( 

226 search_statement, 

227 title_only, 

228 next_rank, 

229 page_size, 

230 [PageVersion.title], 

231 [PageVersion.address], 

232 [], 

233 [PageVersion.content], 

234 ) 

235 if not include_places and not include_guides: 

236 return [] 

237 

238 latest_pages = ( 

239 select(func.max(PageVersion.id).label("id")) 

240 .join(Page, Page.id == PageVersion.page_id) 

241 .where( 

242 or_( 

243 (Page.type == PageType.place) if include_places else False, 

244 (Page.type == PageType.guide) if include_guides else False, 

245 ) 

246 ) 

247 .group_by(PageVersion.page_id) 

248 .subquery() 

249 ) 

250 

251 pages = execute_search_statement( 

252 session, 

253 select(Page, rank, snippet) 

254 .join(PageVersion, PageVersion.page_id == Page.id) 

255 .join(latest_pages, latest_pages.c.id == PageVersion.id), 

256 ) 

257 

258 return [ 

259 search_pb2.Result( 

260 rank=rank, 

261 place=page_to_pb(session, page, context) if page.type == PageType.place else None, 

262 guide=page_to_pb(session, page, context) if page.type == PageType.guide else None, 

263 snippet=snippet, 

264 ) 

265 for page, rank, snippet in pages 

266 ] 

267 

268 

269def _search_events(session, search_statement, title_only, next_rank, page_size, context): 

270 rank, snippet, execute_search_statement = _gen_search_elements( 

271 search_statement, 

272 title_only, 

273 next_rank, 

274 page_size, 

275 [Event.title], 

276 [EventOccurrence.address, EventOccurrence.link], 

277 [], 

278 [EventOccurrence.content], 

279 ) 

280 

281 occurrences = execute_search_statement( 

282 session, 

283 select(EventOccurrence, rank, snippet) 

284 .join(Event, Event.id == EventOccurrence.event_id) 

285 .where(EventOccurrence.end_time >= func.now()), 

286 ) 

287 

288 return [ 

289 search_pb2.Result( 

290 rank=rank, 

291 event=event_to_pb(session, occurrence, context), 

292 snippet=snippet, 

293 ) 

294 for occurrence, rank, snippet in occurrences 

295 ] 

296 

297 

298def _search_clusters( 

299 session, search_statement, title_only, next_rank, page_size, context, include_communities, include_groups 

300): 

301 if not include_communities and not include_groups: 

302 return [] 

303 

304 rank, snippet, execute_search_statement = _gen_search_elements( 

305 search_statement, 

306 title_only, 

307 next_rank, 

308 page_size, 

309 [Cluster.name], 

310 [PageVersion.address, PageVersion.title], 

311 [Cluster.description], 

312 [PageVersion.content], 

313 ) 

314 

315 latest_pages = ( 

316 select(func.max(PageVersion.id).label("id")) 

317 .join(Page, Page.id == PageVersion.page_id) 

318 .where(Page.type == PageType.main_page) 

319 .group_by(PageVersion.page_id) 

320 .subquery() 

321 ) 

322 

323 clusters = execute_search_statement( 

324 session, 

325 select(Cluster, rank, snippet) 

326 .join(Page, Page.owner_cluster_id == Cluster.id) 

327 .join(PageVersion, PageVersion.page_id == Page.id) 

328 .join(latest_pages, latest_pages.c.id == PageVersion.id) 

329 .where(Cluster.is_official_cluster if include_communities and not include_groups else True) 

330 .where(~Cluster.is_official_cluster if not include_communities and include_groups else True), 

331 ) 

332 

333 return [ 

334 search_pb2.Result( 

335 rank=rank, 

336 community=( 

337 community_to_pb(session, cluster.official_cluster_for_node, context) 

338 if cluster.is_official_cluster 

339 else None 

340 ), 

341 group=group_to_pb(session, cluster, context) if not cluster.is_official_cluster else None, 

342 snippet=snippet, 

343 ) 

344 for cluster, rank, snippet in clusters 

345 ] 

346 

347 

348def _user_search_inner(request, context, session): 

349 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

350 

351 # Base statement with visibility filter 

352 statement = select(User.id, User.recommendation_score).where_users_visible(context) 

353 

354 # If exactly_user_ids is present, only filter by those IDs and ignore all other filters 

355 # This is a bit of a hacky feature to help with the frontend map implementation 

356 if len(request.exactly_user_ids) > 0: 

357 statement = statement.where(User.id.in_(request.exactly_user_ids)) 

358 else: 

359 # Apply all the normal filters 

360 if request.HasField("query"): 

361 if request.query_name_only: 

362 statement = statement.where( 

363 or_(User.name.ilike(f"%{request.query.value}%"), User.username.ilike(f"%{request.query.value}%")) 

364 ) 

365 else: 

366 statement = statement.where( 

367 or_( 

368 User.name.ilike(f"%{request.query.value}%"), 

369 User.username.ilike(f"%{request.query.value}%"), 

370 User.city.ilike(f"%{request.query.value}%"), 

371 User.hometown.ilike(f"%{request.query.value}%"), 

372 User.about_me.ilike(f"%{request.query.value}%"), 

373 User.things_i_like.ilike(f"%{request.query.value}%"), 

374 User.about_place.ilike(f"%{request.query.value}%"), 

375 User.additional_information.ilike(f"%{request.query.value}%"), 

376 ) 

377 ) 

378 

379 if request.HasField("last_active"): 

380 raw_dt = to_aware_datetime(request.last_active) 

381 statement = statement.where(User.last_active >= last_active_coarsen(raw_dt)) 

382 

383 if len(request.gender) > 0: 

384 if not has_strong_verification(session, user): 

385 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NEED_STRONG_VERIFICATION) 

386 elif user.gender not in request.gender: 

387 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.MUST_INCLUDE_OWN_GENDER) 

388 else: 

389 statement = statement.where(User.gender.in_(request.gender)) 

390 

391 if len(request.hosting_status_filter) > 0: 

392 statement = statement.where( 

393 User.hosting_status.in_([hostingstatus2sql[status] for status in request.hosting_status_filter]) 

394 ) 

395 if len(request.meetup_status_filter) > 0: 

396 statement = statement.where( 

397 User.meetup_status.in_([meetupstatus2sql[status] for status in request.meetup_status_filter]) 

398 ) 

399 if len(request.smoking_location_filter) > 0: 

400 statement = statement.where( 

401 User.smoking_allowed.in_([smokinglocation2sql[loc] for loc in request.smoking_location_filter]) 

402 ) 

403 if len(request.sleeping_arrangement_filter) > 0: 

404 statement = statement.where( 

405 User.sleeping_arrangement.in_( 

406 [sleepingarrangement2sql[arr] for arr in request.sleeping_arrangement_filter] 

407 ) 

408 ) 

409 if len(request.parking_details_filter) > 0: 

410 statement = statement.where( 

411 User.parking_details.in_([parkingdetails2sql[det] for det in request.parking_details_filter]) 

412 ) 

413 # limits/default could be handled on the front end as well 

414 min_age = request.age_min.value if request.HasField("age_min") else 18 

415 max_age = request.age_max.value if request.HasField("age_max") else 200 

416 

417 statement = statement.where((User.age >= min_age) & (User.age <= max_age)) 

418 

419 # return results with by language code as only input 

420 # fluency in conversational or fluent 

421 

422 if len(request.language_ability_filter) > 0: 

423 language_options = [] 

424 for ability_filter in request.language_ability_filter: 

425 fluency_sql_value = fluency2sql.get(ability_filter.fluency) 

426 

427 if fluency_sql_value is None: 

428 continue 

429 language_options.append( 

430 and_( 

431 (LanguageAbility.language_code == ability_filter.code), 

432 (LanguageAbility.fluency >= (fluency_sql_value)), 

433 ) 

434 ) 

435 statement = statement.join(LanguageAbility, LanguageAbility.user_id == User.id) 

436 statement = statement.where(or_(*language_options)) 

437 

438 if request.HasField("profile_completed"): 

439 statement = statement.where(User.has_completed_profile == request.profile_completed.value) 

440 if request.HasField("guests"): 

441 statement = statement.where(User.max_guests >= request.guests.value) 

442 if request.HasField("last_minute"): 

443 statement = statement.where(User.last_minute == request.last_minute.value) 

444 if request.HasField("has_pets"): 

445 statement = statement.where(User.has_pets == request.has_pets.value) 

446 if request.HasField("accepts_pets"): 

447 statement = statement.where(User.accepts_pets == request.accepts_pets.value) 

448 if request.HasField("has_kids"): 

449 statement = statement.where(User.has_kids == request.has_kids.value) 

450 if request.HasField("accepts_kids"): 

451 statement = statement.where(User.accepts_kids == request.accepts_kids.value) 

452 if request.HasField("has_housemates"): 

453 statement = statement.where(User.has_housemates == request.has_housemates.value) 

454 if request.HasField("wheelchair_accessible"): 

455 statement = statement.where(User.wheelchair_accessible == request.wheelchair_accessible.value) 

456 if request.HasField("smokes_at_home"): 

457 statement = statement.where(User.smokes_at_home == request.smokes_at_home.value) 

458 if request.HasField("drinking_allowed"): 

459 statement = statement.where(User.drinking_allowed == request.drinking_allowed.value) 

460 if request.HasField("drinks_at_home"): 

461 statement = statement.where(User.drinks_at_home == request.drinks_at_home.value) 

462 if request.HasField("parking"): 

463 statement = statement.where(User.parking == request.parking.value) 

464 if request.HasField("camping_ok"): 

465 statement = statement.where(User.camping_ok == request.camping_ok.value) 

466 

467 if request.HasField("search_in_area"): 

468 # EPSG4326 measures distance in decimal degress 

469 # we want to check whether two circles overlap, so check if the distance between their centers is less 

470 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator) 

471 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng) 

472 statement = statement.where( 

473 func.ST_DWithin( 

474 # old: 

475 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111 

476 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius 

477 User.geom, 

478 search_point, 

479 (1000 + request.search_in_area.radius) / 111111, 

480 ) 

481 ) 

482 if request.HasField("search_in_rectangle"): 

483 statement = statement.where( 

484 func.ST_Within( 

485 User.geom, 

486 func.ST_MakeEnvelope( 

487 request.search_in_rectangle.lng_min, 

488 request.search_in_rectangle.lat_min, 

489 request.search_in_rectangle.lng_max, 

490 request.search_in_rectangle.lat_max, 

491 4326, 

492 ), 

493 ) 

494 ) 

495 if request.HasField("search_in_community_id"): 

496 # could do a join here as well, but this is just simpler 

497 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none() 

498 if not node: 

499 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

500 statement = statement.where(func.ST_Contains(node.geom, User.geom)) 

501 

502 if request.only_with_references: 

503 references = ( 

504 select(Reference.to_user_id.label("user_id")) 

505 .where_users_column_visible(context, Reference.from_user_id) 

506 .distinct() 

507 .subquery() 

508 ) 

509 statement = statement.join(references, references.c.user_id == User.id) 

510 

511 if request.only_with_strong_verification: 

512 statement = statement.join( 

513 StrongVerificationAttempt, 

514 and_( 

515 StrongVerificationAttempt.user_id == User.id, 

516 StrongVerificationAttempt.has_strong_verification(User), 

517 ), 

518 ) 

519 # TODO: 

520 # bool friends_only = 13; 

521 

522 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

523 next_recommendation_score = float(decrypt_page_token(request.page_token)) if request.page_token else 1e10 

524 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar() 

525 

526 statement = ( 

527 statement.where(User.recommendation_score <= next_recommendation_score) 

528 .order_by(User.recommendation_score.desc()) 

529 .limit(page_size + 1) 

530 ) 

531 res = session.execute(statement).all() 

532 if res: 

533 users, rec_scores = zip(*res) 

534 else: 

535 users = [] 

536 next_page_token = encrypt_page_token(str(rec_scores[-1])) if len(users) > page_size else None 

537 return users[:page_size], next_page_token, total_items 

538 

539 

540class Search(search_pb2_grpc.SearchServicer): 

541 def Search(self, request, context, session): 

542 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

543 # this is not an ideal page token, some results have equal rank (unlikely) 

544 next_rank = float(request.page_token) if request.page_token else None 

545 

546 all_results = ( 

547 _search_users( 

548 session, 

549 request.query, 

550 request.title_only, 

551 next_rank, 

552 page_size, 

553 context, 

554 request.include_users, 

555 ) 

556 + _search_pages( 

557 session, 

558 request.query, 

559 request.title_only, 

560 next_rank, 

561 page_size, 

562 context, 

563 request.include_places, 

564 request.include_guides, 

565 ) 

566 + _search_events( 

567 session, 

568 request.query, 

569 request.title_only, 

570 next_rank, 

571 page_size, 

572 context, 

573 ) 

574 + _search_clusters( 

575 session, 

576 request.query, 

577 request.title_only, 

578 next_rank, 

579 page_size, 

580 context, 

581 request.include_communities, 

582 request.include_groups, 

583 ) 

584 ) 

585 all_results.sort(key=lambda result: result.rank, reverse=True) 

586 return search_pb2.SearchRes( 

587 results=all_results[:page_size], 

588 next_page_token=str(all_results[page_size].rank) if len(all_results) > page_size else None, 

589 ) 

590 

591 def UserSearch(self, request, context, session): 

592 user_ids_to_return, next_page_token, total_items = _user_search_inner(request, context, session) 

593 

594 user_ids_to_users = dict(session.execute(select(User.id, User).where(User.id.in_(user_ids_to_return))).all()) 

595 

596 return search_pb2.UserSearchRes( 

597 results=[ 

598 search_pb2.Result( 

599 rank=1, 

600 user=user_model_to_pb(user_ids_to_users[user_id], session, context), 

601 ) 

602 for user_id in user_ids_to_return 

603 ], 

604 next_page_token=next_page_token, 

605 total_items=total_items, 

606 ) 

607 

608 def UserSearchV2(self, request, context, session): 

609 user_ids_to_return, next_page_token, total_items = _user_search_inner(request, context, session) 

610 

611 lite_users_by_id = { 

612 lite_user.id: lite_user 

613 for lite_user in session.execute(select(lite_users).where(lite_users.c.id.in_(user_ids_to_return))).all() 

614 } 

615 

616 response_rates_by_id = { 

617 resp_rate.user_id: resp_rate 

618 for resp_rate in session.execute( 

619 select(user_response_rates).where(user_response_rates.c.user_id.in_(user_ids_to_return)) 

620 ).all() 

621 } 

622 

623 db_user_data_by_id = { 

624 user_id: (about_me, gender, last_active, hosting_status, meetup_status) 

625 for user_id, about_me, gender, last_active, hosting_status, meetup_status in session.execute( 

626 select( 

627 User.id, User.about_me, User.gender, User.last_active, User.hosting_status, User.meetup_status 

628 ).where(User.id.in_(user_ids_to_return)) 

629 ).all() 

630 } 

631 

632 ref_counts_by_user_id = get_num_references(session, user_ids_to_return) 

633 

634 def _user_to_search_user(user_id): 

635 lite_user = lite_users_by_id[user_id] 

636 

637 about_me, gender, last_active, hosting_status, meetup_status = db_user_data_by_id[user_id] 

638 

639 lat, lng = get_coordinates(lite_user.geom) 

640 return search_pb2.SearchUser( 

641 user_id=lite_user.id, 

642 username=lite_user.username, 

643 name=lite_user.name, 

644 city=lite_user.city, 

645 has_completed_profile=lite_user.has_completed_profile, 

646 lat=lat, 

647 lng=lng, 

648 profile_snippet=about_me, 

649 num_references=ref_counts_by_user_id.get(lite_user.id, 0), 

650 gender=gender, 

651 age=int(lite_user.age), 

652 last_active=Timestamp_from_datetime(last_active_coarsen(last_active)), 

653 hosting_status=hostingstatus2api[hosting_status], 

654 meetup_status=meetupstatus2api[meetup_status], 

655 avatar_url=urls.media_url(filename=lite_user.avatar_filename, size="full") 

656 if lite_user.avatar_filename 

657 else None, 

658 avatar_thumbnail_url=urls.media_url(filename=lite_user.avatar_filename, size="thumbnail") 

659 if lite_user.avatar_filename 

660 else None, 

661 has_strong_verification=lite_user.has_strong_verification, 

662 **response_rate_to_pb(response_rates_by_id.get(user_id)), 

663 ) 

664 

665 return search_pb2.UserSearchV2Res( 

666 results=[_user_to_search_user(user_id) for user_id in user_ids_to_return], 

667 next_page_token=next_page_token, 

668 total_items=total_items, 

669 ) 

670 

671 def EventSearch(self, request, context, session): 

672 statement = ( 

673 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted) 

674 ) 

675 

676 if request.HasField("query"): 

677 if request.query_title_only: 

678 statement = statement.where(Event.title.ilike(f"%{request.query.value}%")) 

679 else: 

680 statement = statement.where( 

681 or_( 

682 Event.title.ilike(f"%{request.query.value}%"), 

683 EventOccurrence.content.ilike(f"%{request.query.value}%"), 

684 EventOccurrence.address.ilike(f"%{request.query.value}%"), 

685 ) 

686 ) 

687 

688 if request.only_online: 

689 statement = statement.where(EventOccurrence.geom == None) 

690 elif request.only_offline: 

691 statement = statement.where(EventOccurrence.geom != None) 

692 

693 if request.subscribed or request.attending or request.organizing or request.my_communities: 

694 where_ = [] 

695 

696 if request.subscribed: 

697 statement = statement.outerjoin( 

698 EventSubscription, 

699 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id), 

700 ) 

701 where_.append(EventSubscription.user_id != None) 

702 if request.organizing: 

703 statement = statement.outerjoin( 

704 EventOrganizer, 

705 and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id), 

706 ) 

707 where_.append(EventOrganizer.user_id != None) 

708 if request.attending: 

709 statement = statement.outerjoin( 

710 EventOccurrenceAttendee, 

711 and_( 

712 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id, 

713 EventOccurrenceAttendee.user_id == context.user_id, 

714 ), 

715 ) 

716 where_.append(EventOccurrenceAttendee.user_id != None) 

717 if request.my_communities: 

718 my_communities = ( 

719 session.execute( 

720 select(Node.id) 

721 .join(Cluster, Cluster.parent_node_id == Node.id) 

722 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

723 .where(ClusterSubscription.user_id == context.user_id) 

724 .where(Cluster.is_official_cluster) 

725 .order_by(Node.id) 

726 .limit(100000) 

727 ) 

728 .scalars() 

729 .all() 

730 ) 

731 where_.append(Event.parent_node_id.in_(my_communities)) 

732 

733 statement = statement.where(or_(*where_)) 

734 

735 if not request.include_cancelled: 

736 statement = statement.where(~EventOccurrence.is_cancelled) 

737 

738 if request.HasField("search_in_area"): 

739 # EPSG4326 measures distance in decimal degress 

740 # we want to check whether two circles overlap, so check if the distance between their centers is less 

741 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator) 

742 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng) 

743 statement = statement.where( 

744 func.ST_DWithin( 

745 # old: 

746 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111 

747 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius 

748 EventOccurrence.geom, 

749 search_point, 

750 (1000 + request.search_in_area.radius) / 111111, 

751 ) 

752 ) 

753 if request.HasField("search_in_rectangle"): 

754 statement = statement.where( 

755 func.ST_Within( 

756 EventOccurrence.geom, 

757 func.ST_MakeEnvelope( 

758 request.search_in_rectangle.lng_min, 

759 request.search_in_rectangle.lat_min, 

760 request.search_in_rectangle.lng_max, 

761 request.search_in_rectangle.lat_max, 

762 4326, 

763 ), 

764 ) 

765 ) 

766 if request.HasField("search_in_community_id"): 

767 # could do a join here as well, but this is just simpler 

768 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none() 

769 if not node: 

770 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

771 statement = statement.where(func.ST_Contains(node.geom, EventOccurrence.geom)) 

772 

773 if request.HasField("after"): 

774 statement = statement.where(EventOccurrence.start_time > to_aware_datetime(request.after)) 

775 if request.HasField("before"): 

776 statement = statement.where(EventOccurrence.end_time < to_aware_datetime(request.before)) 

777 

778 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

779 # the page token is a unix timestamp of where we left off 

780 page_token = ( 

781 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now() 

782 ) 

783 page_number = request.page_number or 1 

784 # Calculate the offset for pagination 

785 offset = (page_number - 1) * page_size 

786 

787 if not request.past: 

788 statement = statement.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

789 EventOccurrence.start_time.asc() 

790 ) 

791 else: 

792 statement = statement.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

793 EventOccurrence.start_time.desc() 

794 ) 

795 

796 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar() 

797 # Apply pagination by page number 

798 statement = statement.offset(offset).limit(page_size) if request.page_number else statement.limit(page_size + 1) 

799 occurrences = session.execute(statement).scalars().all() 

800 

801 return search_pb2.EventSearchRes( 

802 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

803 next_page_token=(str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None), 

804 total_items=total_items, 

805 )