Coverage for app / backend / src / couchers / servicers / search.py: 83%

283 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1""" 

2See //docs/search.md for an overview. 

3""" 

4 

5from datetime import timedelta 

6from typing import Any, cast 

7 

8import grpc 

9from sqlalchemy import literal_column, select 

10from sqlalchemy.orm import Session 

11from sqlalchemy.sql import and_, func, or_ 

12 

13from couchers import urls 

14from couchers.context import CouchersContext 

15from couchers.crypto import decrypt_page_token, encrypt_page_token 

16from couchers.event_log import log_event 

17from couchers.helpers.completed_profile import has_completed_profile_expression 

18from couchers.helpers.strong_verification import has_strong_verification 

19from couchers.materialized_views import LiteUser, UserResponseRate 

20from couchers.models import ( 

21 Cluster, 

22 ClusterSubscription, 

23 Event, 

24 EventOccurrence, 

25 EventOccurrenceAttendee, 

26 EventOrganizer, 

27 EventSubscription, 

28 LanguageAbility, 

29 Node, 

30 Page, 

31 PageType, 

32 PageVersion, 

33 Reference, 

34 StrongVerificationAttempt, 

35 User, 

36) 

37from couchers.proto import search_pb2, search_pb2_grpc 

38from couchers.reranker import reranker 

39from couchers.servicers.api import ( 

40 fluency2sql, 

41 get_num_references, 

42 hostingstatus2api, 

43 hostingstatus2sql, 

44 meetupstatus2api, 

45 meetupstatus2sql, 

46 parkingdetails2sql, 

47 response_rate_to_pb, 

48 sleepingarrangement2sql, 

49 smokinglocation2sql, 

50 user_model_to_pb, 

51) 

52from couchers.servicers.communities import community_to_pb 

53from couchers.servicers.events import event_to_pb 

54from couchers.servicers.groups import group_to_pb 

55from couchers.servicers.pages import page_to_pb 

56from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible 

57from couchers.utils import ( 

58 Timestamp_from_datetime, 

59 create_coordinate, 

60 dt_from_millis, 

61 get_coordinates, 

62 last_active_coarsen, 

63 millis_from_dt, 

64 now, 

65 to_aware_datetime, 

66) 

67 

68# searches are a bit expensive, we'd rather send back a bunch of results at once than lots of small pages 

69MAX_PAGINATION_LENGTH = 100 

70 

71REGCONFIG = "english" 

72TRI_SIMILARITY_THRESHOLD = 0.6 

73TRI_SIMILARITY_WEIGHT = 5 

74 

75 

76def _join_with_space(coalesces: list[Any]) -> Any: 

77 # the objects in coalesces are not strings, so we can't do " ".join(coalesces). They're SQLAlchemy magic. 

78 if not coalesces: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true

79 return "" 

80 out = coalesces[0] 

81 for coalesce in coalesces[1:]: 

82 out += " " + coalesce 

83 return out 

84 

85 

86def _build_tsv(A: list[Any], B: list[Any] | None = None, C: list[Any] | None = None, D: list[Any] | None = None) -> Any: 

87 """ 

88 Given lists for A, B, C, and D, builds a tsvector from them. 

89 """ 

90 B = B or [] 

91 C = C or [] 

92 D = D or [] 

93 # Use literal_column for weight letters to avoid psycopg3 type binding issues 

94 # PostgreSQL's setweight expects "char" type (internal single-byte type) 

95 tsv: Any = func.setweight( 

96 func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in A])), 

97 literal_column("'A'"), 

98 ) 

99 if B: 99 ↛ 106line 99 didn't jump to line 106 because the condition on line 99 was always true

100 tsv = tsv.concat( 

101 func.setweight( 

102 func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in B])), 

103 literal_column("'B'"), 

104 ) 

105 ) 

106 if C: 

107 tsv = tsv.concat( 

108 func.setweight( 

109 func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in C])), 

110 literal_column("'C'"), 

111 ) 

112 ) 

113 if D: 113 ↛ 120line 113 didn't jump to line 120 because the condition on line 113 was always true

114 tsv = tsv.concat( 

115 func.setweight( 

116 func.to_tsvector(REGCONFIG, _join_with_space([func.coalesce(bit, "") for bit in D])), 

117 literal_column("'D'"), 

118 ) 

119 ) 

120 return tsv 

121 

122 

123def _build_doc(A: list[Any], B: list[Any] | None = None, C: list[Any] | None = None, D: list[Any] | None = None) -> Any: 

124 """ 

125 Builds the raw document (without to_tsvector and weighting), used for extracting snippet 

126 """ 

127 B = B or [] 

128 C = C or [] 

129 D = D or [] 

130 doc = _join_with_space([func.coalesce(bit, "") for bit in A]) 

131 if B: 

132 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in B]) 

133 if C: 

134 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in C]) 

135 if D: 

136 doc += " " + _join_with_space([func.coalesce(bit, "") for bit in D]) 

137 return doc 

138 

139 

140def _similarity(statement: Any, text: str) -> Any: 

141 return func.word_similarity(func.unaccent(statement), func.unaccent(text)) 

142 

143 

144def _gen_search_elements( 

145 statement: str, 

146 title_only: bool, 

147 next_rank: float | None, 

148 page_size: int, 

149 A: list[Any], 

150 B: list[Any] | None = None, 

151 C: list[Any] | None = None, 

152 D: list[Any] | None = None, 

153) -> tuple[Any, Any, Any]: 

154 """ 

155 Given an sql statement and four sets of fields, (A, B, C, D), generates a bunch of postgres expressions for full text search. 

156 

157 The four sets are in decreasing order of "importance" for ranking. 

158 

159 A should be the "title", the others can be anything. 

160 

161 If title_only=True, we only perform a trigram search against A only 

162 """ 

163 B = B or [] 

164 C = C or [] 

165 D = D or [] 

166 if not title_only: 

167 # a postgres tsquery object that can be used to match against a tsvector 

168 tsq = func.websearch_to_tsquery(REGCONFIG, statement) 

169 

170 # the tsvector object that we want to search against with our tsquery 

171 tsv = _build_tsv(A, B, C, D) 

172 

173 # document to generate snippet from 

174 doc = _build_doc(A, B, C, D) 

175 

176 title = _build_doc(A) 

177 

178 # trigram-based text similarity between title and sql statement string 

179 sim = _similarity(statement, title) 

180 

181 # ranking algo, weigh the similarity a lot, the text-based ranking less 

182 rank = (TRI_SIMILARITY_WEIGHT * sim + func.ts_rank_cd(tsv, tsq)).label("rank") 

183 

184 # the snippet with results highlighted 

185 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet") 

186 

187 def execute_search_statement(session: Session, orig_statement: Any) -> list[Any]: 

188 """ 

189 Does the right search filtering, limiting, and ordering for the initial statement 

190 """ 

191 query = ( 

192 orig_statement.where(or_(tsv.op("@@")(tsq), sim > TRI_SIMILARITY_THRESHOLD)) 

193 .where(rank <= next_rank if next_rank is not None else True) 

194 .order_by(rank.desc()) 

195 .limit(page_size + 1) 

196 ) 

197 return cast(list[Any], session.execute(query).all()) 

198 

199 else: 

200 title = _build_doc(A) 

201 

202 # trigram-based text similarity between title and sql statement string 

203 sim = _similarity(statement, title) 

204 

205 # ranking algo, weigh the similarity a lot, the text-based ranking less 

206 rank = sim.label("rank") 

207 

208 # used only for headline 

209 tsq = func.websearch_to_tsquery(REGCONFIG, statement) 

210 doc = _build_doc(A, B, C, D) 

211 

212 # the snippet with results highlighted 

213 snippet = func.ts_headline(REGCONFIG, doc, tsq, "StartSel=**,StopSel=**").label("snippet") 

214 

215 def execute_search_statement(session: Session, orig_statement: Any) -> list[Any]: 

216 """ 

217 Does the right search filtering, limiting, and ordering for the initial statement 

218 """ 

219 query = ( 

220 orig_statement.where(sim > TRI_SIMILARITY_THRESHOLD) 

221 .where(rank <= next_rank if next_rank is not None else True) 

222 .order_by(rank.desc()) 

223 .limit(page_size + 1) 

224 ) 

225 return cast(list[Any], session.execute(query).all()) 

226 

227 return rank, snippet, execute_search_statement 

228 

229 

230def _search_users( 

231 session: Session, 

232 search_statement: str, 

233 title_only: bool, 

234 next_rank: float | None, 

235 page_size: int, 

236 context: CouchersContext, 

237 include_users: bool, 

238) -> list[search_pb2.Result]: 

239 if not include_users: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 return [] 

241 rank, snippet, execute_search_statement = _gen_search_elements( 

242 search_statement, 

243 title_only, 

244 next_rank, 

245 page_size, 

246 [User.username, User.name], 

247 [User.city], 

248 [User.about_me], 

249 [User.things_i_like, User.about_place, User.additional_information], 

250 ) 

251 

252 users = execute_search_statement(session, select(User, rank, snippet).where(users_visible(context))) 

253 

254 return [ 

255 search_pb2.Result( 

256 rank=rank, 

257 user=user_model_to_pb(page, session, context), 

258 snippet=snippet, 

259 ) 

260 for page, rank, snippet in users 

261 ] 

262 

263 

264def _search_pages( 

265 session: Session, 

266 search_statement: str, 

267 title_only: bool, 

268 next_rank: float | None, 

269 page_size: int, 

270 context: CouchersContext, 

271 include_places: bool, 

272 include_guides: bool, 

273) -> list[search_pb2.Result]: 

274 rank, snippet, execute_search_statement = _gen_search_elements( 

275 search_statement, 

276 title_only, 

277 next_rank, 

278 page_size, 

279 [PageVersion.title], 

280 [PageVersion.address], 

281 [], 

282 [PageVersion.content], 

283 ) 

284 if not include_places and not include_guides: 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true

285 return [] 

286 

287 latest_pages = ( 

288 select(func.max(PageVersion.id).label("id")) 

289 .join(Page, Page.id == PageVersion.page_id) 

290 .where( 

291 or_( 

292 (Page.type == PageType.place) if include_places else to_bool(False), 

293 (Page.type == PageType.guide) if include_guides else to_bool(False), 

294 ) 

295 ) 

296 .group_by(PageVersion.page_id) 

297 .subquery() 

298 ) 

299 

300 pages = execute_search_statement( 

301 session, 

302 select(Page, rank, snippet) 

303 .join(PageVersion, PageVersion.page_id == Page.id) 

304 .join(latest_pages, latest_pages.c.id == PageVersion.id), 

305 ) 

306 

307 return [ 

308 search_pb2.Result( 

309 rank=rank, 

310 place=page_to_pb(session, page, context) if page.type == PageType.place else None, 

311 guide=page_to_pb(session, page, context) if page.type == PageType.guide else None, 

312 snippet=snippet, 

313 ) 

314 for page, rank, snippet in pages 

315 ] 

316 

317 

318def _search_events( 

319 session: Session, 

320 search_statement: str, 

321 title_only: bool, 

322 next_rank: float | None, 

323 page_size: int, 

324 context: CouchersContext, 

325) -> list[search_pb2.Result]: 

326 rank, snippet, execute_search_statement = _gen_search_elements( 

327 search_statement, 

328 title_only, 

329 next_rank, 

330 page_size, 

331 [Event.title], 

332 [EventOccurrence.address, EventOccurrence.link], 

333 [], 

334 [EventOccurrence.content], 

335 ) 

336 

337 occurrences = execute_search_statement( 

338 session, 

339 where_moderated_content_visible( 

340 select(EventOccurrence, rank, snippet) 

341 .join(Event, Event.id == EventOccurrence.event_id) 

342 .where(EventOccurrence.end_time >= func.now()), 

343 context, 

344 EventOccurrence, 

345 is_list_operation=True, 

346 ), 

347 ) 

348 

349 return [ 

350 search_pb2.Result( 

351 rank=rank, 

352 event=event_to_pb(session, occurrence, context), 

353 snippet=snippet, 

354 ) 

355 for occurrence, rank, snippet in occurrences 

356 ] 

357 

358 

359def _search_clusters( 

360 session: Session, 

361 search_statement: str, 

362 title_only: bool, 

363 next_rank: float | None, 

364 page_size: int, 

365 context: CouchersContext, 

366 include_communities: bool, 

367 include_groups: bool, 

368) -> list[search_pb2.Result]: 

369 if not include_communities and not include_groups: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true

370 return [] 

371 

372 rank, snippet, execute_search_statement = _gen_search_elements( 

373 search_statement, 

374 title_only, 

375 next_rank, 

376 page_size, 

377 [Cluster.name], 

378 [PageVersion.address, PageVersion.title], 

379 [Cluster.description], 

380 [PageVersion.content], 

381 ) 

382 

383 latest_pages = ( 

384 select(func.max(PageVersion.id).label("id")) 

385 .join(Page, Page.id == PageVersion.page_id) 

386 .where(Page.type == PageType.main_page) 

387 .group_by(PageVersion.page_id) 

388 .subquery() 

389 ) 

390 

391 clusters = execute_search_statement( 

392 session, 

393 select(Cluster, rank, snippet) 

394 .join(Page, Page.owner_cluster_id == Cluster.id) 

395 .join(PageVersion, PageVersion.page_id == Page.id) 

396 .join(latest_pages, latest_pages.c.id == PageVersion.id) 

397 .where(Cluster.is_official_cluster if include_communities and not include_groups else to_bool(True)) 

398 .where(~Cluster.is_official_cluster if not include_communities and include_groups else to_bool(True)), 

399 ) 

400 

401 return [ 

402 search_pb2.Result( 

403 rank=rank, 

404 community=( 

405 community_to_pb(session, cluster.official_cluster_for_node, context) 

406 if cluster.is_official_cluster 

407 else None 

408 ), 

409 group=group_to_pb(session, cluster, context) if not cluster.is_official_cluster else None, 

410 snippet=snippet, 

411 ) 

412 for cluster, rank, snippet in clusters 

413 ] 

414 

415 

416def _user_search_inner( 

417 request: search_pb2.UserSearchReq, context: CouchersContext, session: Session 

418) -> tuple[list[int], str | None, int]: 

419 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

420 

421 # Base statement with visibility filter 

422 statement = select(User.id, User.recommendation_score).where(users_visible(context)) 

423 # make sure that only users who are in LiteUser show up 

424 statement = statement.join(LiteUser, LiteUser.id == User.id) 

425 

426 # If exactly_user_ids is present, only filter by those IDs and ignore all other filters 

427 # This is a bit of a hacky feature to help with the frontend map implementation 

428 if len(request.exactly_user_ids) > 0: 

429 statement = statement.where(User.id.in_(request.exactly_user_ids)) 

430 else: 

431 # Apply all the normal filters 

432 if request.HasField("query"): 432 ↛ 433line 432 didn't jump to line 433 because the condition on line 432 was never true

433 if request.query_name_only: 

434 statement = statement.where( 

435 or_(User.name.ilike(f"%{request.query.value}%"), User.username.ilike(f"%{request.query.value}%")) 

436 ) 

437 else: 

438 statement = statement.where( 

439 or_( 

440 User.name.ilike(f"%{request.query.value}%"), 

441 User.username.ilike(f"%{request.query.value}%"), 

442 User.city.ilike(f"%{request.query.value}%"), 

443 User.hometown.ilike(f"%{request.query.value}%"), 

444 User.about_me.ilike(f"%{request.query.value}%"), 

445 User.things_i_like.ilike(f"%{request.query.value}%"), 

446 User.about_place.ilike(f"%{request.query.value}%"), 

447 User.additional_information.ilike(f"%{request.query.value}%"), 

448 ) 

449 ) 

450 

451 if request.HasField("last_active"): 451 ↛ 452line 451 didn't jump to line 452 because the condition on line 451 was never true

452 raw_dt = to_aware_datetime(request.last_active) 

453 statement = statement.where(User.last_active >= last_active_coarsen(raw_dt)) 

454 

455 if request.same_gender_only: 

456 if not has_strong_verification(session, user): 

457 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "need_strong_verification") 

458 statement = statement.where(User.gender == user.gender) 

459 

460 if len(request.hosting_status_filter) > 0: 

461 statement = statement.where( 

462 User.hosting_status.in_([hostingstatus2sql[status] for status in request.hosting_status_filter]) 

463 ) 

464 if len(request.meetup_status_filter) > 0: 

465 statement = statement.where( 

466 User.meetup_status.in_([meetupstatus2sql[status] for status in request.meetup_status_filter]) 

467 ) 

468 if len(request.smoking_location_filter) > 0: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true

469 statement = statement.where( 

470 User.smoking_allowed.in_([smokinglocation2sql[loc] for loc in request.smoking_location_filter]) 

471 ) 

472 if len(request.sleeping_arrangement_filter) > 0: 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true

473 statement = statement.where( 

474 User.sleeping_arrangement.in_( 

475 [sleepingarrangement2sql[arr] for arr in request.sleeping_arrangement_filter] 

476 ) 

477 ) 

478 if len(request.parking_details_filter) > 0: 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true

479 statement = statement.where( 

480 User.parking_details.in_([parkingdetails2sql[det] for det in request.parking_details_filter]) 

481 ) 

482 # limits/default could be handled on the front end as well 

483 min_age = request.age_min.value if request.HasField("age_min") else 18 

484 max_age = request.age_max.value if request.HasField("age_max") else 200 

485 

486 statement = statement.where((User.age >= min_age) & (User.age <= max_age)) 

487 

488 # return results with by language code as only input 

489 # fluency in conversational or fluent 

490 

491 if len(request.language_ability_filter) > 0: 

492 language_options = [] 

493 for ability_filter in request.language_ability_filter: 

494 fluency_sql_value = fluency2sql.get(ability_filter.fluency) 

495 

496 if fluency_sql_value is None: 496 ↛ 497line 496 didn't jump to line 497 because the condition on line 496 was never true

497 continue 

498 language_options.append( 

499 and_( 

500 (LanguageAbility.language_code == ability_filter.code), 

501 (LanguageAbility.fluency >= (fluency_sql_value)), 

502 ) 

503 ) 

504 statement = statement.join(LanguageAbility, LanguageAbility.user_id == User.id) 

505 statement = statement.where(or_(*language_options)) 

506 

507 if request.HasField("profile_completed"): 

508 statement = statement.where(has_completed_profile_expression() == request.profile_completed.value) 

509 if request.HasField("guests"): 509 ↛ 510line 509 didn't jump to line 510 because the condition on line 509 was never true

510 statement = statement.where(User.max_guests >= request.guests.value) 

511 if request.HasField("last_minute"): 511 ↛ 512line 511 didn't jump to line 512 because the condition on line 511 was never true

512 statement = statement.where(User.last_minute == request.last_minute.value) 

513 if request.HasField("has_pets"): 513 ↛ 514line 513 didn't jump to line 514 because the condition on line 513 was never true

514 statement = statement.where(User.has_pets == request.has_pets.value) 

515 if request.HasField("accepts_pets"): 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true

516 statement = statement.where(User.accepts_pets == request.accepts_pets.value) 

517 if request.HasField("has_kids"): 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true

518 statement = statement.where(User.has_kids == request.has_kids.value) 

519 if request.HasField("accepts_kids"): 519 ↛ 520line 519 didn't jump to line 520 because the condition on line 519 was never true

520 statement = statement.where(User.accepts_kids == request.accepts_kids.value) 

521 if request.HasField("has_housemates"): 521 ↛ 522line 521 didn't jump to line 522 because the condition on line 521 was never true

522 statement = statement.where(User.has_housemates == request.has_housemates.value) 

523 if request.HasField("wheelchair_accessible"): 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true

524 statement = statement.where(User.wheelchair_accessible == request.wheelchair_accessible.value) 

525 if request.HasField("smokes_at_home"): 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true

526 statement = statement.where(User.smokes_at_home == request.smokes_at_home.value) 

527 if request.HasField("drinking_allowed"): 527 ↛ 528line 527 didn't jump to line 528 because the condition on line 527 was never true

528 statement = statement.where(User.drinking_allowed == request.drinking_allowed.value) 

529 if request.HasField("drinks_at_home"): 529 ↛ 530line 529 didn't jump to line 530 because the condition on line 529 was never true

530 statement = statement.where(User.drinks_at_home == request.drinks_at_home.value) 

531 if request.HasField("parking"): 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true

532 statement = statement.where(User.parking == request.parking.value) 

533 if request.HasField("camping_ok"): 533 ↛ 534line 533 didn't jump to line 534 because the condition on line 533 was never true

534 statement = statement.where(User.camping_ok == request.camping_ok.value) 

535 

536 if request.HasField("search_in_area"): 

537 # EPSG4326 measures distance in decimal degress 

538 # we want to check whether two circles overlap, so check if the distance between their centers is less 

539 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator) 

540 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng) 

541 statement = statement.where( 

542 func.ST_DWithin( 

543 # old: 

544 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111 

545 # this is an optimization that speeds up the db queries since it doesn't need to look up the 

546 # user's geom radius 

547 User.geom, 

548 search_point, 

549 (1000 + request.search_in_area.radius) / 111111, 

550 ) 

551 ) 

552 if request.HasField("search_in_rectangle"): 

553 statement = statement.where( 

554 func.ST_Within( 

555 User.geom, 

556 func.ST_MakeEnvelope( 

557 request.search_in_rectangle.lng_min, 

558 request.search_in_rectangle.lat_min, 

559 request.search_in_rectangle.lng_max, 

560 request.search_in_rectangle.lat_max, 

561 4326, 

562 ), 

563 ) 

564 ) 

565 if request.HasField("search_in_community_id"): 565 ↛ 567line 565 didn't jump to line 567 because the condition on line 565 was never true

566 # could do a join here as well, but this is just simpler 

567 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none() 

568 if not node: 

569 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

570 statement = statement.where(func.ST_Contains(node.geom, User.geom)) 

571 

572 if request.only_with_references: 

573 references = ( 

574 where_users_column_visible( 

575 select(Reference.to_user_id.label("user_id")), 

576 context, 

577 Reference.from_user_id, 

578 ) 

579 .distinct() 

580 .subquery() 

581 ) 

582 statement = statement.join(references, references.c.user_id == User.id) 

583 

584 if request.only_with_strong_verification: 

585 statement = statement.join( 

586 StrongVerificationAttempt, 

587 and_( 

588 StrongVerificationAttempt.user_id == User.id, 

589 StrongVerificationAttempt.has_strong_verification(User), 

590 ), 

591 ) 

592 # TODO: 

593 # bool friends_only = 13; 

594 

595 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

596 next_recommendation_score = float(decrypt_page_token(request.page_token)) if request.page_token else 1e10 

597 total_items = cast(int, session.execute(select(func.count()).select_from(statement.subquery())).scalar()) 

598 

599 statement = ( 

600 statement.where(User.recommendation_score <= next_recommendation_score) 

601 .order_by(User.recommendation_score.desc()) 

602 .limit(page_size + 1) 

603 ) 

604 res = session.execute(statement).all() 

605 users: list[int] = [] 

606 if res: 

607 users, rec_scores = zip(*res) # type: ignore[assignment] 

608 

609 next_page_token = encrypt_page_token(str(rec_scores[-1])) if len(users) > page_size else None 

610 return users[:page_size], next_page_token, total_items 

611 

612 

613class Search(search_pb2_grpc.SearchServicer): 

614 def Search(self, request: search_pb2.SearchReq, context: CouchersContext, session: Session) -> search_pb2.SearchRes: 

615 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

616 # this is not an ideal page token, some results have equal rank (unlikely) 

617 next_rank = float(request.page_token) if request.page_token else None 

618 

619 all_results = ( 

620 _search_users( 

621 session, 

622 request.query, 

623 request.title_only, 

624 next_rank, 

625 page_size, 

626 context, 

627 request.include_users, 

628 ) 

629 + _search_pages( 

630 session, 

631 request.query, 

632 request.title_only, 

633 next_rank, 

634 page_size, 

635 context, 

636 request.include_places, 

637 request.include_guides, 

638 ) 

639 + _search_events( 

640 session, 

641 request.query, 

642 request.title_only, 

643 next_rank, 

644 page_size, 

645 context, 

646 ) 

647 + _search_clusters( 

648 session, 

649 request.query, 

650 request.title_only, 

651 next_rank, 

652 page_size, 

653 context, 

654 request.include_communities, 

655 request.include_groups, 

656 ) 

657 ) 

658 all_results.sort(key=lambda result: result.rank, reverse=True) 

659 return search_pb2.SearchRes( 

660 results=all_results[:page_size], 

661 next_page_token=str(all_results[page_size].rank) if len(all_results) > page_size else None, 

662 ) 

663 

664 def UserSearch( 

665 self, request: search_pb2.UserSearchReq, context: CouchersContext, session: Session 

666 ) -> search_pb2.UserSearchRes: 

667 user_ids_to_return, next_page_token, total_items = _user_search_inner(request, context, session) 

668 

669 log_event( 

670 context, 

671 session, 

672 "search.performed", 

673 { 

674 "search_in": request.WhichOneof("search_in"), 

675 "has_query": request.HasField("query"), 

676 "has_filters": ( 

677 len(request.hosting_status_filter) > 0 

678 or len(request.meetup_status_filter) > 0 

679 or len(request.smoking_location_filter) > 0 

680 or len(request.sleeping_arrangement_filter) > 0 

681 or len(request.parking_details_filter) > 0 

682 or len(request.language_ability_filter) > 0 

683 or request.only_with_references 

684 or request.only_with_strong_verification 

685 ), 

686 "total_items": total_items, 

687 }, 

688 ) 

689 

690 user_ids_to_users: dict[int, User] = dict( 

691 session.execute( # type: ignore[arg-type] 

692 select(User.id, User).where(User.id.in_(user_ids_to_return)) 

693 ).all() 

694 ) 

695 

696 return search_pb2.UserSearchRes( 

697 results=[ 

698 search_pb2.Result( 

699 rank=1, 

700 user=user_model_to_pb(user_ids_to_users[user_id], session, context), 

701 ) 

702 for user_id in user_ids_to_return 

703 ], 

704 next_page_token=next_page_token, 

705 total_items=total_items, 

706 ) 

707 

708 def UserSearchV2( 

709 self, request: search_pb2.UserSearchReq, context: CouchersContext, session: Session 

710 ) -> search_pb2.UserSearchV2Res: 

711 user_ids_to_return, next_page_token, total_items = _user_search_inner(request, context, session) 

712 

713 LiteUser_by_id = { 

714 lite_user.id: lite_user 

715 for lite_user in session.execute(select(LiteUser).where(LiteUser.id.in_(user_ids_to_return))) 

716 .scalars() 

717 .all() 

718 } 

719 

720 response_rate_by_id = { 

721 resp_rate.user_id: resp_rate 

722 for resp_rate in session.execute( 

723 select(UserResponseRate).where(UserResponseRate.user_id.in_(user_ids_to_return)) 

724 ) 

725 .scalars() 

726 .all() 

727 } 

728 

729 db_user_data_by_id = { 

730 user_id: (about_me, gender, last_active, hosting_status, meetup_status, joined) 

731 for user_id, about_me, gender, last_active, hosting_status, meetup_status, joined in session.execute( 

732 select( 

733 User.id, 

734 User.about_me, 

735 User.gender, 

736 User.last_active, 

737 User.hosting_status, 

738 User.meetup_status, 

739 User.joined, 

740 ).where(User.id.in_(user_ids_to_return)) 

741 ).all() 

742 } 

743 

744 ref_counts_by_user_id = get_num_references(session, user_ids_to_return) 

745 

746 def _user_to_search_user(user_id: int) -> search_pb2.SearchUser: 

747 lite_user = LiteUser_by_id[user_id] 

748 

749 about_me, gender, last_active, hosting_status, meetup_status, joined = db_user_data_by_id[user_id] 

750 

751 lat, lng = get_coordinates(lite_user.geom) 

752 return search_pb2.SearchUser( 

753 user_id=lite_user.id, 

754 username=lite_user.username, 

755 name=lite_user.name, 

756 city=lite_user.city, 

757 joined=Timestamp_from_datetime(last_active_coarsen(joined)), 

758 has_completed_profile=lite_user.has_completed_profile, 

759 has_completed_my_home=lite_user.has_completed_my_home, 

760 lat=lat, 

761 lng=lng, 

762 profile_snippet=about_me, 

763 num_references=ref_counts_by_user_id.get(lite_user.id, 0), 

764 gender=gender, 

765 age=int(lite_user.age), 

766 last_active=Timestamp_from_datetime(last_active_coarsen(last_active)), 

767 hosting_status=hostingstatus2api[hosting_status], 

768 meetup_status=meetupstatus2api[meetup_status], 

769 avatar_url=urls.media_url(filename=lite_user.avatar_filename, size="full") 

770 if lite_user.avatar_filename 

771 else None, 

772 avatar_thumbnail_url=urls.media_url(filename=lite_user.avatar_filename, size="thumbnail") 

773 if lite_user.avatar_filename 

774 else None, 

775 has_strong_verification=lite_user.has_strong_verification, 

776 **response_rate_to_pb(response_rate_by_id.get(user_id)), 

777 ) 

778 

779 results = reranker([_user_to_search_user(user_id) for user_id in user_ids_to_return]) 

780 

781 return search_pb2.UserSearchV2Res( 

782 results=results, 

783 next_page_token=next_page_token, 

784 total_items=total_items, 

785 ) 

786 

787 def EventSearch( 

788 self, request: search_pb2.EventSearchReq, context: CouchersContext, session: Session 

789 ) -> search_pb2.EventSearchRes: 

790 statement = ( 

791 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted) 

792 ) 

793 statement = where_moderated_content_visible(statement, context, EventOccurrence, is_list_operation=True) 

794 

795 if request.HasField("query"): 

796 if request.query_title_only: 

797 statement = statement.where(Event.title.ilike(f"%{request.query.value}%")) 

798 else: 

799 statement = statement.where( 

800 or_( 

801 Event.title.ilike(f"%{request.query.value}%"), 

802 EventOccurrence.content.ilike(f"%{request.query.value}%"), 

803 EventOccurrence.address.ilike(f"%{request.query.value}%"), 

804 ) 

805 ) 

806 

807 if request.only_online: 

808 statement = statement.where(EventOccurrence.geom == None) 

809 elif request.only_offline: 

810 statement = statement.where(EventOccurrence.geom != None) 

811 

812 if request.subscribed or request.attending or request.organizing or request.my_communities: 

813 where_ = [] 

814 

815 if request.subscribed: 

816 statement = statement.outerjoin( 

817 EventSubscription, 

818 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id), 

819 ) 

820 where_.append(EventSubscription.user_id != None) 

821 if request.organizing: 

822 statement = statement.outerjoin( 

823 EventOrganizer, 

824 and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id), 

825 ) 

826 where_.append(EventOrganizer.user_id != None) 

827 if request.attending: 

828 statement = statement.outerjoin( 

829 EventOccurrenceAttendee, 

830 and_( 

831 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id, 

832 EventOccurrenceAttendee.user_id == context.user_id, 

833 ), 

834 ) 

835 where_.append(EventOccurrenceAttendee.user_id != None) 

836 if request.my_communities: 

837 my_communities = ( 

838 session.execute( 

839 select(Node.id) 

840 .join(Cluster, Cluster.parent_node_id == Node.id) 

841 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

842 .where(ClusterSubscription.user_id == context.user_id) 

843 .where(Cluster.is_official_cluster) 

844 .order_by(Node.id) 

845 .limit(100000) 

846 ) 

847 .scalars() 

848 .all() 

849 ) 

850 where_.append(Event.parent_node_id.in_(my_communities)) 

851 

852 statement = statement.where(or_(*where_)) 

853 

854 if not request.include_cancelled: 854 ↛ 857line 854 didn't jump to line 857 because the condition on line 854 was always true

855 statement = statement.where(~EventOccurrence.is_cancelled) 

856 

857 if request.HasField("search_in_area"): 

858 # EPSG4326 measures distance in decimal degress 

859 # we want to check whether two circles overlap, so check if the distance between their centers is less 

860 # than the sum of their radii, divided by 111111 m ~= 1 degree (at the equator) 

861 search_point = create_coordinate(request.search_in_area.lat, request.search_in_area.lng) 

862 statement = statement.where( 

863 func.ST_DWithin( 

864 # old: 

865 # User.geom, search_point, (User.geom_radius + request.search_in_area.radius) / 111111 

866 # this is an optimization that speeds up the db queries since it doesn't need to look up the user's geom radius 

867 EventOccurrence.geom, 

868 search_point, 

869 (1000 + request.search_in_area.radius) / 111111, 

870 ) 

871 ) 

872 if request.HasField("search_in_rectangle"): 

873 statement = statement.where( 

874 func.ST_Within( 

875 EventOccurrence.geom, 

876 func.ST_MakeEnvelope( 

877 request.search_in_rectangle.lng_min, 

878 request.search_in_rectangle.lat_min, 

879 request.search_in_rectangle.lng_max, 

880 request.search_in_rectangle.lat_max, 

881 4326, 

882 ), 

883 ) 

884 ) 

885 if request.HasField("search_in_community_id"): 885 ↛ 887line 885 didn't jump to line 887 because the condition on line 885 was never true

886 # could do a join here as well, but this is just simpler 

887 node = session.execute(select(Node).where(Node.id == request.search_in_community_id)).scalar_one_or_none() 

888 if not node: 

889 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

890 statement = statement.where(func.ST_Contains(node.geom, EventOccurrence.geom)) 

891 

892 if request.HasField("after"): 

893 after_time = to_aware_datetime(request.after) 

894 statement = statement.where(EventOccurrence.start_time > after_time) 

895 if request.HasField("before"): 

896 before_time = to_aware_datetime(request.before) 

897 statement = statement.where(EventOccurrence.end_time < before_time) 

898 

899 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

900 # the page token is a unix timestamp of where we left off 

901 page_token = ( 

902 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now() 

903 ) 

904 page_number = request.page_number or 1 

905 # Calculate the offset for pagination 

906 offset = (page_number - 1) * page_size 

907 

908 if not request.past: 

909 cutoff = page_token - timedelta(seconds=1) 

910 statement = statement.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc()) 

911 else: 

912 cutoff = page_token + timedelta(seconds=1) 

913 statement = statement.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc()) 

914 

915 total_items = session.execute(select(func.count()).select_from(statement.subquery())).scalar() 

916 # Apply pagination by page number 

917 statement = statement.offset(offset).limit(page_size) if request.page_number else statement.limit(page_size + 1) 

918 occurrences = session.execute(statement).scalars().all() 

919 

920 return search_pb2.EventSearchRes( 

921 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

922 next_page_token=(str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None), 

923 total_items=total_items, 

924 )