Coverage for app / backend / src / tests / test_communities.py: 100%

675 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1from datetime import timedelta 

2 

3import grpc 

4import pytest 

5from geoalchemy2 import WKBElement 

6from google.protobuf import empty_pb2, wrappers_pb2 

7from sqlalchemy import select 

8from sqlalchemy.orm import Session 

9 

10from couchers.db import is_user_in_node_geography, session_scope 

11from couchers.materialized_views import refresh_materialized_views 

12from couchers.models import ( 

13 Cluster, 

14 ClusterRole, 

15 ClusterSubscription, 

16 Node, 

17 Page, 

18 PageType, 

19 PageVersion, 

20 SignupFlow, 

21 Thread, 

22 User, 

23) 

24from couchers.proto import api_pb2, auth_pb2, communities_pb2, discussions_pb2, events_pb2, pages_pb2 

25from couchers.tasks import enforce_community_memberships 

26from couchers.utils import Timestamp_from_datetime, create_coordinate, create_polygon_lat_lng, now, to_multi 

27from tests.fixtures.db import generate_user, get_user_id_and_token 

28from tests.fixtures.sessions import ( 

29 auth_api_session, 

30 communities_session, 

31 discussions_session, 

32 events_session, 

33 pages_session, 

34) 

35from tests.test_auth import get_session_cookie_tokens 

36 

37 

38@pytest.fixture(autouse=True) 

39def _(testconfig): 

40 pass 

41 

42 

43# For testing purposes, restrict ourselves to a 1D-world, consisting of "intervals" that have width 2, and coordinates 

44# that are points at (x, 1). 

45# we'll stick to EPSG4326, even though it's not ideal, so don't use too large values, but it's around the equator, so 

46# mostly fine 

47 

48 

49def create_1d_polygon(lb: int, ub: int) -> WKBElement: 

50 # given a lower bound and upper bound on x, creates the given interval 

51 return create_polygon_lat_lng([[lb, 0], [lb, 2], [ub, 2], [ub, 0], [lb, 0]]) 

52 

53 

54def create_1d_point(x: int) -> WKBElement: 

55 return create_coordinate(x, 1) 

56 

57 

58def create_community( 

59 session: Session, 

60 interval_lb: int, 

61 interval_ub: int, 

62 name: str, 

63 admins: list[User], 

64 extra_members: list[User], 

65 parent: Node | None, 

66) -> Node: 

67 node = Node( 

68 geom=to_multi(create_1d_polygon(interval_lb, interval_ub)), 

69 parent_node_id=parent.id if parent else None, 

70 ) 

71 session.add(node) 

72 session.flush() 

73 cluster = Cluster( 

74 name=f"{name}", 

75 description=f"Description for {name}", 

76 parent_node_id=node.id, 

77 is_official_cluster=True, 

78 ) 

79 session.add(cluster) 

80 session.flush() 

81 thread = Thread() 

82 session.add(thread) 

83 session.flush() 

84 main_page = Page( 

85 parent_node_id=cluster.parent_node_id, 

86 creator_user_id=admins[0].id, 

87 owner_cluster_id=cluster.id, 

88 type=PageType.main_page, 

89 thread_id=thread.id, 

90 ) 

91 session.add(main_page) 

92 session.flush() 

93 page_version = PageVersion( 

94 page_id=main_page.id, 

95 editor_user_id=admins[0].id, 

96 title=f"Main page for the {name} community", 

97 content="There is nothing here yet...", 

98 ) 

99 session.add(page_version) 

100 for admin in admins: 

101 cluster.cluster_subscriptions.append( 

102 ClusterSubscription( 

103 user_id=admin.id, 

104 cluster_id=cluster.id, 

105 role=ClusterRole.admin, 

106 ) 

107 ) 

108 for member in extra_members: 

109 cluster.cluster_subscriptions.append( 

110 ClusterSubscription( 

111 user_id=member.id, 

112 cluster_id=cluster.id, 

113 role=ClusterRole.member, 

114 ) 

115 ) 

116 session.commit() 

117 # other members will be added by enforce_community_memberships() 

118 return node 

119 

120 

121def create_group( 

122 session: Session, name: str, admins: list[User], members: list[User], parent_community: Node | None 

123) -> Cluster: 

124 assert parent_community is not None 

125 cluster = Cluster( 

126 name=f"{name}", 

127 description=f"Description for {name}", 

128 parent_node_id=parent_community.id, 

129 ) 

130 session.add(cluster) 

131 session.flush() 

132 thread = Thread() 

133 session.add(thread) 

134 session.flush() 

135 main_page = Page( 

136 parent_node_id=cluster.parent_node_id, 

137 creator_user_id=admins[0].id, 

138 owner_cluster_id=cluster.id, 

139 type=PageType.main_page, 

140 thread_id=thread.id, 

141 ) 

142 session.add(main_page) 

143 session.flush() 

144 page_version = PageVersion( 

145 page_id=main_page.id, 

146 editor_user_id=admins[0].id, 

147 title=f"Main page for the {name} community", 

148 content="There is nothing here yet...", 

149 ) 

150 session.add(page_version) 

151 for admin in admins: 

152 cluster.cluster_subscriptions.append( 

153 ClusterSubscription( 

154 user_id=admin.id, 

155 cluster_id=cluster.id, 

156 role=ClusterRole.admin, 

157 ) 

158 ) 

159 for member in members: 

160 cluster.cluster_subscriptions.append( 

161 ClusterSubscription( 

162 user_id=member.id, 

163 cluster_id=cluster.id, 

164 role=ClusterRole.member, 

165 ) 

166 ) 

167 session.commit() 

168 return cluster 

169 

170 

171def create_place(token: str, title: str, content: str, address: str, x: float) -> None: 

172 with pages_session(token) as api: 

173 api.CreatePlace( 

174 pages_pb2.CreatePlaceReq( 

175 title=title, 

176 content=content, 

177 address=address, 

178 location=pages_pb2.Coordinate( 

179 lat=x, 

180 lng=1, 

181 ), 

182 ) 

183 ) 

184 

185 

186def create_discussion(token: str, community_id: int | None, group_id: int | None, title: str, content: str) -> None: 

187 # set group_id or community_id to None 

188 with discussions_session(token) as api: 

189 api.CreateDiscussion( 

190 discussions_pb2.CreateDiscussionReq( 

191 title=title, 

192 content=content, 

193 owner_community_id=community_id, 

194 owner_group_id=group_id, 

195 ) 

196 ) 

197 

198 

199def create_event( 

200 token: str, community_id: int | None, group_id: int | None, title: str, content: str, start_td: timedelta 

201) -> None: 

202 with events_session(token) as api: 

203 res = api.CreateEvent( 

204 events_pb2.CreateEventReq( 

205 title=title, 

206 content=content, 

207 offline_information=events_pb2.OfflineEventInformation( 

208 address="Near Null Island", 

209 lat=0.1, 

210 lng=0.2, 

211 ), 

212 start_time=Timestamp_from_datetime(now() + start_td), 

213 end_time=Timestamp_from_datetime(now() + start_td + timedelta(hours=2)), 

214 timezone="UTC", 

215 ) 

216 ) 

217 api.TransferEvent( 

218 events_pb2.TransferEventReq( 

219 event_id=res.event_id, 

220 new_owner_community_id=community_id, 

221 new_owner_group_id=group_id, 

222 ) 

223 ) 

224 

225 

226def get_community_id(session: Session, community_name: str) -> int: 

227 return session.execute( 

228 select(Cluster.parent_node_id).where(Cluster.is_official_cluster).where(Cluster.name == community_name) 

229 ).scalar_one() 

230 

231 

232def get_group_id(session: Session, group_name: str) -> int: 

233 return session.execute( 

234 select(Cluster.id).where(~Cluster.is_official_cluster).where(Cluster.name == group_name) 

235 ).scalar_one() 

236 

237 

238@pytest.fixture(scope="class") 

239def testing_communities(db_class, testconfig): 

240 user1, token1 = generate_user(username="user1", geom=create_1d_point(1), geom_radius=0.1) 

241 user2, token2 = generate_user(username="user2", geom=create_1d_point(2), geom_radius=0.1) 

242 user3, token3 = generate_user(username="user3", geom=create_1d_point(3), geom_radius=0.1) 

243 user4, token4 = generate_user(username="user4", geom=create_1d_point(8), geom_radius=0.1) 

244 user5, token5 = generate_user(username="user5", geom=create_1d_point(6), geom_radius=0.1) 

245 user6, token6 = generate_user(username="user6", geom=create_1d_point(65), geom_radius=0.1) 

246 user7, token7 = generate_user(username="user7", geom=create_1d_point(80), geom_radius=0.1) 

247 user8, token8 = generate_user(username="user8", geom=create_1d_point(51), geom_radius=0.1) 

248 

249 with session_scope() as session: 

250 w = create_community(session, 0, 100, "Global", [user1, user3, user7], [], None) 

251 c2 = create_community(session, 52, 100, "Country 2", [user6, user7], [], w) 

252 c2r1 = create_community(session, 52, 71, "Country 2, Region 1", [user6], [user8], c2) 

253 c2r1c1 = create_community(session, 53, 70, "Country 2, Region 1, City 1", [user8], [], c2r1) 

254 c1 = create_community(session, 0, 50, "Country 1", [user1, user2], [], w) 

255 c1r1 = create_community(session, 0, 10, "Country 1, Region 1", [user1, user2], [], c1) 

256 c1r1c1 = create_community(session, 0, 5, "Country 1, Region 1, City 1", [user2], [], c1r1) 

257 c1r1c2 = create_community(session, 7, 10, "Country 1, Region 1, City 2", [user4, user5], [user2], c1r1) 

258 c1r2 = create_community(session, 20, 25, "Country 1, Region 2", [user2], [], c1) 

259 c1r2c1 = create_community(session, 21, 23, "Country 1, Region 2, City 1", [user2], [], c1r2) 

260 

261 h = create_group(session, "Hitchhikers", [user1, user2], [user5, user8], w) 

262 create_group(session, "Country 1, Region 1, Foodies", [user1], [user2, user4], c1r1) 

263 create_group(session, "Country 1, Region 1, Skaters", [user2], [user1], c1r1) 

264 create_group(session, "Country 1, Region 2, Foodies", [user2], [user4, user5], c1r2) 

265 create_group(session, "Country 2, Region 1, Foodies", [user6], [user7], c2r1) 

266 

267 w_id = w.id 

268 c1r1c2_id = c1r1c2.id 

269 h_id = h.id 

270 c1_id = c1.id 

271 

272 create_discussion(token1, w_id, None, "Discussion title 1", "Discussion content 1") 

273 create_discussion(token3, w_id, None, "Discussion title 2", "Discussion content 2") 

274 create_discussion(token3, w_id, None, "Discussion title 3", "Discussion content 3") 

275 create_discussion(token3, w_id, None, "Discussion title 4", "Discussion content 4") 

276 create_discussion(token3, w_id, None, "Discussion title 5", "Discussion content 5") 

277 create_discussion(token3, w_id, None, "Discussion title 6", "Discussion content 6") 

278 create_discussion(token4, c1r1c2_id, None, "Discussion title 7", "Discussion content 7") 

279 create_discussion(token5, None, h_id, "Discussion title 8", "Discussion content 8") 

280 create_discussion(token1, None, h_id, "Discussion title 9", "Discussion content 9") 

281 create_discussion(token2, None, h_id, "Discussion title 10", "Discussion content 10") 

282 create_discussion(token3, None, h_id, "Discussion title 11", "Discussion content 11") 

283 create_discussion(token4, None, h_id, "Discussion title 12", "Discussion content 12") 

284 create_discussion(token5, None, h_id, "Discussion title 13", "Discussion content 13") 

285 create_discussion(token8, None, h_id, "Discussion title 14", "Discussion content 14") 

286 

287 create_event(token3, c1_id, None, "Event title 1", "Event content 1", timedelta(hours=1)) 

288 create_event(token1, c1_id, None, "Event title 2", "Event content 2", timedelta(hours=2)) 

289 create_event(token3, c1_id, None, "Event title 3", "Event content 3", timedelta(hours=3)) 

290 create_event(token1, c1_id, None, "Event title 4", "Event content 4", timedelta(hours=4)) 

291 create_event(token3, c1_id, None, "Event title 5", "Event content 5", timedelta(hours=5)) 

292 create_event(token1, c1_id, None, "Event title 6", "Event content 6", timedelta(hours=6)) 

293 create_event(token2, None, h_id, "Event title 7", "Event content 7", timedelta(hours=7)) 

294 create_event(token2, None, h_id, "Event title 8", "Event content 8", timedelta(hours=8)) 

295 create_event(token2, None, h_id, "Event title 9", "Event content 9", timedelta(hours=9)) 

296 create_event(token2, None, h_id, "Event title 10", "Event content 10", timedelta(hours=10)) 

297 create_event(token2, None, h_id, "Event title 11", "Event content 11", timedelta(hours=11)) 

298 create_event(token2, None, h_id, "Event title 12", "Event content 12", timedelta(hours=12)) 

299 

300 enforce_community_memberships() 

301 

302 create_place(token1, "Country 1, Region 1, Attraction", "Place content", "Somewhere in c1r1", 6) 

303 create_place(token2, "Country 1, Region 1, City 1, Attraction 1", "Place content", "Somewhere in c1r1c1", 3) 

304 create_place(token2, "Country 1, Region 1, City 1, Attraction 2", "Place content", "Somewhere in c1r1c1", 4) 

305 create_place(token8, "Global, Attraction", "Place content", "Somewhere in w", 51.5) 

306 create_place(token6, "Country 2, Region 1, Attraction", "Place content", "Somewhere in c2r1", 59) 

307 

308 refresh_materialized_views(empty_pb2.Empty()) 

309 

310 yield 

311 

312 

313class TestCommunities: 

314 @staticmethod 

315 def test_GetCommunity(testing_communities): 

316 with session_scope() as session: 

317 user1_id, token1 = get_user_id_and_token(session, "user1") 

318 user2_id, token2 = get_user_id_and_token(session, "user2") 

319 user6_id, token6 = get_user_id_and_token(session, "user6") 

320 w_id = get_community_id(session, "Global") 

321 c1_id = get_community_id(session, "Country 1") 

322 c1r1_id = get_community_id(session, "Country 1, Region 1") 

323 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") 

324 c2_id = get_community_id(session, "Country 2") 

325 

326 with communities_session(token2) as api: 

327 res = api.GetCommunity( 

328 communities_pb2.GetCommunityReq( 

329 community_id=w_id, 

330 ) 

331 ) 

332 assert res.name == "Global" 

333 assert res.slug == "global" 

334 assert res.description == "Description for Global" 

335 assert len(res.parents) == 1 

336 assert res.parents[0].HasField("community") 

337 assert res.parents[0].community.community_id == w_id 

338 assert res.parents[0].community.name == "Global" 

339 assert res.parents[0].community.slug == "global" 

340 assert res.parents[0].community.description == "Description for Global" 

341 assert res.main_page.type == pages_pb2.PAGE_TYPE_MAIN_PAGE 

342 assert res.main_page.slug == "main-page-for-the-global-community" 

343 assert res.main_page.last_editor_user_id == user1_id 

344 assert res.main_page.creator_user_id == user1_id 

345 assert res.main_page.owner_community_id == w_id 

346 assert res.main_page.title == "Main page for the Global community" 

347 assert res.main_page.content == "There is nothing here yet..." 

348 assert not res.main_page.can_edit 

349 assert not res.main_page.can_moderate 

350 assert res.main_page.editor_user_ids == [user1_id] 

351 assert res.member 

352 assert not res.admin 

353 assert res.member_count == 8 

354 assert res.admin_count == 3 

355 

356 res = api.GetCommunity( 

357 communities_pb2.GetCommunityReq( 

358 community_id=c1r1c1_id, 

359 ) 

360 ) 

361 assert res.community_id == c1r1c1_id 

362 assert res.name == "Country 1, Region 1, City 1" 

363 assert res.slug == "country-1-region-1-city-1" 

364 assert res.description == "Description for Country 1, Region 1, City 1" 

365 assert len(res.parents) == 4 

366 assert res.parents[0].HasField("community") 

367 assert res.parents[0].community.community_id == w_id 

368 assert res.parents[0].community.name == "Global" 

369 assert res.parents[0].community.slug == "global" 

370 assert res.parents[0].community.description == "Description for Global" 

371 assert res.parents[1].HasField("community") 

372 assert res.parents[1].community.community_id == c1_id 

373 assert res.parents[1].community.name == "Country 1" 

374 assert res.parents[1].community.slug == "country-1" 

375 assert res.parents[1].community.description == "Description for Country 1" 

376 assert res.parents[2].HasField("community") 

377 assert res.parents[2].community.community_id == c1r1_id 

378 assert res.parents[2].community.name == "Country 1, Region 1" 

379 assert res.parents[2].community.slug == "country-1-region-1" 

380 assert res.parents[2].community.description == "Description for Country 1, Region 1" 

381 assert res.parents[3].HasField("community") 

382 assert res.parents[3].community.community_id == c1r1c1_id 

383 assert res.parents[3].community.name == "Country 1, Region 1, City 1" 

384 assert res.parents[3].community.slug == "country-1-region-1-city-1" 

385 assert res.parents[3].community.description == "Description for Country 1, Region 1, City 1" 

386 assert res.main_page.type == pages_pb2.PAGE_TYPE_MAIN_PAGE 

387 assert res.main_page.slug == "main-page-for-the-country-1-region-1-city-1-community" 

388 assert res.main_page.last_editor_user_id == user2_id 

389 assert res.main_page.creator_user_id == user2_id 

390 assert res.main_page.owner_community_id == c1r1c1_id 

391 assert res.main_page.title == "Main page for the Country 1, Region 1, City 1 community" 

392 assert res.main_page.content == "There is nothing here yet..." 

393 assert res.main_page.can_edit 

394 assert res.main_page.can_moderate 

395 assert res.main_page.editor_user_ids == [user2_id] 

396 assert res.member 

397 assert res.admin 

398 assert res.member_count == 3 

399 assert res.admin_count == 1 

400 

401 res = api.GetCommunity( 

402 communities_pb2.GetCommunityReq( 

403 community_id=c2_id, 

404 ) 

405 ) 

406 assert res.community_id == c2_id 

407 assert res.name == "Country 2" 

408 assert res.slug == "country-2" 

409 assert res.description == "Description for Country 2" 

410 assert len(res.parents) == 2 

411 assert res.parents[0].HasField("community") 

412 assert res.parents[0].community.community_id == w_id 

413 assert res.parents[0].community.name == "Global" 

414 assert res.parents[0].community.slug == "global" 

415 assert res.parents[0].community.description == "Description for Global" 

416 assert res.parents[1].HasField("community") 

417 assert res.parents[1].community.community_id == c2_id 

418 assert res.parents[1].community.name == "Country 2" 

419 assert res.parents[1].community.slug == "country-2" 

420 assert res.parents[1].community.description == "Description for Country 2" 

421 assert res.main_page.type == pages_pb2.PAGE_TYPE_MAIN_PAGE 

422 assert res.main_page.slug == "main-page-for-the-country-2-community" 

423 assert res.main_page.last_editor_user_id == user6_id 

424 assert res.main_page.creator_user_id == user6_id 

425 assert res.main_page.owner_community_id == c2_id 

426 assert res.main_page.title == "Main page for the Country 2 community" 

427 assert res.main_page.content == "There is nothing here yet..." 

428 assert not res.main_page.can_edit 

429 assert not res.main_page.can_moderate 

430 assert res.main_page.editor_user_ids == [user6_id] 

431 assert not res.member 

432 assert not res.admin 

433 assert res.member_count == 2 

434 assert res.admin_count == 2 

435 

436 @staticmethod 

437 def test_ListCommunities(testing_communities): 

438 with session_scope() as session: 

439 user1_id, token1 = get_user_id_and_token(session, "user1") 

440 c1_id = get_community_id(session, "Country 1") 

441 c1r1_id = get_community_id(session, "Country 1, Region 1") 

442 c1r2_id = get_community_id(session, "Country 1, Region 2") 

443 

444 with communities_session(token1) as api: 

445 res = api.ListCommunities( 

446 communities_pb2.ListCommunitiesReq( 

447 community_id=c1_id, 

448 ) 

449 ) 

450 assert [c.community_id for c in res.communities] == [c1r1_id, c1r2_id] 

451 

452 @staticmethod 

453 def test_ListCommunities_all(testing_communities): 

454 with session_scope() as session: 

455 user1_id, token1 = get_user_id_and_token(session, "user1") 

456 w_id = get_community_id(session, "Global") 

457 c1_id = get_community_id(session, "Country 1") 

458 c1r1_id = get_community_id(session, "Country 1, Region 1") 

459 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") 

460 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

461 c1r2_id = get_community_id(session, "Country 1, Region 2") 

462 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") 

463 c2_id = get_community_id(session, "Country 2") 

464 c2r1_id = get_community_id(session, "Country 2, Region 1") 

465 c2r1c1_id = get_community_id(session, "Country 2, Region 1, City 1") 

466 

467 # Fetch all communities ordered by name 

468 with communities_session(token1) as api: 

469 res = api.ListCommunities( 

470 communities_pb2.ListCommunitiesReq( 

471 page_size=5, 

472 ) 

473 ) 

474 assert [c.community_id for c in res.communities] == [c1_id, c1r1_id, c1r1c1_id, c1r1c2_id, c1r2_id] 

475 res = api.ListCommunities( 

476 communities_pb2.ListCommunitiesReq( 

477 page_size=2, 

478 page_token=res.next_page_token, 

479 ) 

480 ) 

481 assert [c.community_id for c in res.communities] == [c1r2c1_id, c2_id] 

482 res = api.ListCommunities( 

483 communities_pb2.ListCommunitiesReq( 

484 page_size=5, 

485 page_token=res.next_page_token, 

486 ) 

487 ) 

488 assert [c.community_id for c in res.communities] == [c2r1_id, c2r1c1_id, w_id] 

489 

490 @staticmethod 

491 def test_ListUserCommunities(testing_communities): 

492 with session_scope() as session: 

493 user2_id, token2 = get_user_id_and_token(session, "user2") 

494 w_id = get_community_id(session, "Global") 

495 c1_id = get_community_id(session, "Country 1") 

496 c1r1_id = get_community_id(session, "Country 1, Region 1") 

497 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") 

498 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

499 c1r2_id = get_community_id(session, "Country 1, Region 2") 

500 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") 

501 

502 # Fetch user2's communities from user2's account 

503 with communities_session(token2) as api: 

504 res = api.ListUserCommunities(communities_pb2.ListUserCommunitiesReq()) 

505 assert [c.community_id for c in res.communities] == [ 

506 w_id, 

507 c1_id, 

508 c1r1_id, 

509 c1r1c1_id, 

510 c1r1c2_id, 

511 c1r2_id, 

512 c1r2c1_id, 

513 ] 

514 

515 @staticmethod 

516 def test_ListOtherUserCommunities(testing_communities): 

517 with session_scope() as session: 

518 user1_id, token1 = get_user_id_and_token(session, "user1") 

519 user2_id, token2 = get_user_id_and_token(session, "user2") 

520 w_id = get_community_id(session, "Global") 

521 c1_id = get_community_id(session, "Country 1") 

522 c1r1_id = get_community_id(session, "Country 1, Region 1") 

523 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") 

524 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

525 c1r2_id = get_community_id(session, "Country 1, Region 2") 

526 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") 

527 

528 # Fetch user2's communities from user1's account 

529 with communities_session(token1) as api: 

530 res = api.ListUserCommunities(communities_pb2.ListUserCommunitiesReq(user_id=user2_id)) 

531 assert [c.community_id for c in res.communities] == [ 

532 w_id, 

533 c1_id, 

534 c1r1_id, 

535 c1r1c1_id, 

536 c1r1c2_id, 

537 c1r2_id, 

538 c1r2c1_id, 

539 ] 

540 

541 @staticmethod 

542 def test_ListGroups(testing_communities): 

543 with session_scope() as session: 

544 user1_id, token1 = get_user_id_and_token(session, "user1") 

545 user5_id, token5 = get_user_id_and_token(session, "user5") 

546 w_id = get_community_id(session, "Global") 

547 hitchhikers_id = get_group_id(session, "Hitchhikers") 

548 c1r1_id = get_community_id(session, "Country 1, Region 1") 

549 foodies_id = get_group_id(session, "Country 1, Region 1, Foodies") 

550 skaters_id = get_group_id(session, "Country 1, Region 1, Skaters") 

551 

552 with communities_session(token1) as api: 

553 res = api.ListGroups( 

554 communities_pb2.ListGroupsReq( 

555 community_id=c1r1_id, 

556 ) 

557 ) 

558 assert [g.group_id for g in res.groups] == [foodies_id, skaters_id] 

559 

560 with communities_session(token5) as api: 

561 res = api.ListGroups( 

562 communities_pb2.ListGroupsReq( 

563 community_id=w_id, 

564 ) 

565 ) 

566 assert len(res.groups) == 1 

567 assert res.groups[0].group_id == hitchhikers_id 

568 

569 @staticmethod 

570 def test_ListAdmins(testing_communities): 

571 with session_scope() as session: 

572 user1_id, token1 = get_user_id_and_token(session, "user1") 

573 user3_id, token3 = get_user_id_and_token(session, "user3") 

574 user4_id, token4 = get_user_id_and_token(session, "user4") 

575 user5_id, token5 = get_user_id_and_token(session, "user5") 

576 user7_id, token7 = get_user_id_and_token(session, "user7") 

577 w_id = get_community_id(session, "Global") 

578 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

579 

580 with communities_session(token1) as api: 

581 res = api.ListAdmins( 

582 communities_pb2.ListAdminsReq( 

583 community_id=w_id, 

584 ) 

585 ) 

586 assert res.admin_user_ids == [user1_id, user3_id, user7_id] 

587 

588 res = api.ListAdmins( 

589 communities_pb2.ListAdminsReq( 

590 community_id=c1r1c2_id, 

591 ) 

592 ) 

593 assert res.admin_user_ids == [user4_id, user5_id] 

594 

595 @staticmethod 

596 def test_AddAdmin(testing_communities): 

597 with session_scope() as session: 

598 user4_id, token4 = get_user_id_and_token(session, "user4") 

599 user5_id, _ = get_user_id_and_token(session, "user5") 

600 user2_id, _ = get_user_id_and_token(session, "user2") 

601 user8_id, token8 = get_user_id_and_token(session, "user8") 

602 node_id = get_community_id(session, "Country 1, Region 1, City 2") 

603 

604 with communities_session(token8) as api: 

605 with pytest.raises(grpc.RpcError) as err: 

606 api.AddAdmin(communities_pb2.AddAdminReq(community_id=node_id, user_id=user2_id)) 

607 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

608 assert err.value.details() == "You're not allowed to moderate that community" 

609 

610 with communities_session(token4) as api: 

611 res = api.ListAdmins(communities_pb2.ListAdminsReq(community_id=node_id)) 

612 assert res.admin_user_ids == [user4_id, user5_id] 

613 

614 with pytest.raises(grpc.RpcError) as err: 

615 api.AddAdmin(communities_pb2.AddAdminReq(community_id=node_id, user_id=user8_id)) 

616 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

617 assert err.value.details() == "That user is not in the community." 

618 

619 with pytest.raises(grpc.RpcError) as err: 

620 api.AddAdmin(communities_pb2.AddAdminReq(community_id=node_id, user_id=user5_id)) 

621 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

622 assert err.value.details() == "That user is already an admin." 

623 

624 api.AddAdmin(communities_pb2.AddAdminReq(community_id=node_id, user_id=user2_id)) 

625 res = api.ListAdmins(communities_pb2.ListAdminsReq(community_id=node_id)) 

626 assert res.admin_user_ids == [user2_id, user4_id, user5_id] 

627 # Cleanup because database changes do not roll back 

628 api.RemoveAdmin(communities_pb2.RemoveAdminReq(community_id=node_id, user_id=user2_id)) 

629 

630 @staticmethod 

631 def test_RemoveAdmin(testing_communities): 

632 with session_scope() as session: 

633 user4_id, token4 = get_user_id_and_token(session, "user4") 

634 user5_id, _ = get_user_id_and_token(session, "user5") 

635 user2_id, _ = get_user_id_and_token(session, "user2") 

636 user8_id, token8 = get_user_id_and_token(session, "user8") 

637 node_id = get_community_id(session, "Country 1, Region 1, City 2") 

638 

639 with communities_session(token8) as api: 

640 with pytest.raises(grpc.RpcError) as err: 

641 api.AddAdmin(communities_pb2.AddAdminReq(community_id=node_id, user_id=user2_id)) 

642 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

643 assert err.value.details() == "You're not allowed to moderate that community" 

644 

645 with communities_session(token4) as api: 

646 res = api.ListAdmins(communities_pb2.ListAdminsReq(community_id=node_id)) 

647 assert res.admin_user_ids == [user4_id, user5_id] 

648 

649 with pytest.raises(grpc.RpcError) as err: 

650 api.RemoveAdmin(communities_pb2.RemoveAdminReq(community_id=node_id, user_id=user8_id)) 

651 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

652 assert err.value.details() == "That user is not in the community." 

653 

654 with pytest.raises(grpc.RpcError) as err: 

655 api.RemoveAdmin(communities_pb2.RemoveAdminReq(community_id=node_id, user_id=user2_id)) 

656 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

657 assert err.value.details() == "That user is not an admin." 

658 

659 api.RemoveAdmin(communities_pb2.RemoveAdminReq(community_id=node_id, user_id=user5_id)) 

660 res = api.ListAdmins(communities_pb2.ListAdminsReq(community_id=node_id)) 

661 assert res.admin_user_ids == [user4_id] 

662 # Cleanup because database changes do not roll back 

663 api.AddAdmin(communities_pb2.AddAdminReq(community_id=node_id, user_id=user5_id)) 

664 

665 @staticmethod 

666 def test_ListMembers(testing_communities): 

667 with session_scope() as session: 

668 user1_id, token1 = get_user_id_and_token(session, "user1") 

669 user2_id, token2 = get_user_id_and_token(session, "user2") 

670 user3_id, token3 = get_user_id_and_token(session, "user3") 

671 user4_id, token4 = get_user_id_and_token(session, "user4") 

672 user5_id, token5 = get_user_id_and_token(session, "user5") 

673 user6_id, token6 = get_user_id_and_token(session, "user6") 

674 user7_id, token7 = get_user_id_and_token(session, "user7") 

675 user8_id, token8 = get_user_id_and_token(session, "user8") 

676 w_id = get_community_id(session, "Global") 

677 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

678 

679 with communities_session(token1) as api: 

680 res = api.ListMembers( 

681 communities_pb2.ListMembersReq( 

682 community_id=w_id, 

683 ) 

684 ) 

685 assert res.member_user_ids == [ 

686 user8_id, 

687 user7_id, 

688 user6_id, 

689 user5_id, 

690 user4_id, 

691 user3_id, 

692 user2_id, 

693 user1_id, 

694 ] 

695 

696 res = api.ListMembers( 

697 communities_pb2.ListMembersReq( 

698 community_id=c1r1c2_id, 

699 ) 

700 ) 

701 assert res.member_user_ids == [user5_id, user4_id, user2_id] 

702 

703 @staticmethod 

704 def test_ListNearbyUsers(testing_communities): 

705 with session_scope() as session: 

706 user1_id, token1 = get_user_id_and_token(session, "user1") 

707 user2_id, token2 = get_user_id_and_token(session, "user2") 

708 user3_id, token3 = get_user_id_and_token(session, "user3") 

709 user4_id, token4 = get_user_id_and_token(session, "user4") 

710 user5_id, token5 = get_user_id_and_token(session, "user5") 

711 user6_id, token6 = get_user_id_and_token(session, "user6") 

712 user7_id, token7 = get_user_id_and_token(session, "user7") 

713 user8_id, token8 = get_user_id_and_token(session, "user8") 

714 w_id = get_community_id(session, "Global") 

715 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

716 

717 with communities_session(token1) as api: 

718 res = api.ListNearbyUsers( 

719 communities_pb2.ListNearbyUsersReq( 

720 community_id=w_id, 

721 ) 

722 ) 

723 assert res.nearby_user_ids == [ 

724 user1_id, 

725 user2_id, 

726 user3_id, 

727 user4_id, 

728 user5_id, 

729 user6_id, 

730 user7_id, 

731 user8_id, 

732 ] 

733 

734 res = api.ListNearbyUsers( 

735 communities_pb2.ListNearbyUsersReq( 

736 community_id=c1r1c2_id, 

737 ) 

738 ) 

739 assert res.nearby_user_ids == [user4_id] 

740 

741 @staticmethod 

742 def test_ListDiscussions(testing_communities): 

743 with session_scope() as session: 

744 user1_id, token1 = get_user_id_and_token(session, "user1") 

745 w_id = get_community_id(session, "Global") 

746 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

747 

748 with communities_session(token1) as api: 

749 res = api.ListDiscussions( 

750 communities_pb2.ListDiscussionsReq( 

751 community_id=w_id, 

752 page_size=3, 

753 ) 

754 ) 

755 assert [d.title for d in res.discussions] == [ 

756 "Discussion title 6", 

757 "Discussion title 5", 

758 "Discussion title 4", 

759 ] 

760 for d in res.discussions: 

761 assert d.thread.thread_id > 0 

762 assert d.thread.num_responses == 0 

763 

764 res = api.ListDiscussions( 

765 communities_pb2.ListDiscussionsReq( 

766 community_id=w_id, 

767 page_token=res.next_page_token, 

768 page_size=2, 

769 ) 

770 ) 

771 assert [d.title for d in res.discussions] == [ 

772 "Discussion title 3", 

773 "Discussion title 2", 

774 ] 

775 for d in res.discussions: 

776 assert d.thread.thread_id > 0 

777 assert d.thread.num_responses == 0 

778 

779 res = api.ListDiscussions( 

780 communities_pb2.ListDiscussionsReq( 

781 community_id=w_id, 

782 page_token=res.next_page_token, 

783 page_size=2, 

784 ) 

785 ) 

786 assert [d.title for d in res.discussions] == [ 

787 "Discussion title 1", 

788 ] 

789 for d in res.discussions: 

790 assert d.thread.thread_id > 0 

791 assert d.thread.num_responses == 0 

792 

793 res = api.ListDiscussions( 

794 communities_pb2.ListDiscussionsReq( 

795 community_id=c1r1c2_id, 

796 ) 

797 ) 

798 assert [d.title for d in res.discussions] == [ 

799 "Discussion title 7", 

800 ] 

801 for d in res.discussions: 

802 assert d.thread.thread_id > 0 

803 assert d.thread.num_responses == 0 

804 

805 @staticmethod 

806 def test_is_user_in_node_geography(testing_communities): 

807 with session_scope() as session: 

808 c1_id = get_community_id(session, "Country 1") 

809 

810 user1_id, _ = get_user_id_and_token(session, "user1") 

811 user2_id, _ = get_user_id_and_token(session, "user2") 

812 user3_id, _ = get_user_id_and_token(session, "user3") 

813 user4_id, _ = get_user_id_and_token(session, "user4") 

814 user5_id, _ = get_user_id_and_token(session, "user5") 

815 

816 # All these users should be in Country 1's geography 

817 assert is_user_in_node_geography(session, user1_id, c1_id) 

818 assert is_user_in_node_geography(session, user2_id, c1_id) 

819 assert is_user_in_node_geography(session, user3_id, c1_id) 

820 assert is_user_in_node_geography(session, user4_id, c1_id) 

821 assert is_user_in_node_geography(session, user5_id, c1_id) 

822 

823 @staticmethod 

824 def test_ListEvents(testing_communities): 

825 with session_scope() as session: 

826 user1_id, token1 = get_user_id_and_token(session, "user1") 

827 c1_id = get_community_id(session, "Country 1") 

828 

829 with communities_session(token1) as api: 

830 res = api.ListEvents( 

831 communities_pb2.ListEventsReq( 

832 community_id=c1_id, 

833 page_size=3, 

834 ) 

835 ) 

836 assert [d.title for d in res.events] == [ 

837 "Event title 1", 

838 "Event title 2", 

839 "Event title 3", 

840 ] 

841 

842 res = api.ListEvents( 

843 communities_pb2.ListEventsReq( 

844 community_id=c1_id, 

845 page_token=res.next_page_token, 

846 page_size=2, 

847 ) 

848 ) 

849 assert [d.title for d in res.events] == [ 

850 "Event title 4", 

851 "Event title 5", 

852 ] 

853 

854 res = api.ListEvents( 

855 communities_pb2.ListEventsReq( 

856 community_id=c1_id, 

857 page_token=res.next_page_token, 

858 page_size=2, 

859 ) 

860 ) 

861 assert [d.title for d in res.events] == [ 

862 "Event title 6", 

863 ] 

864 assert not res.next_page_token 

865 

866 @staticmethod 

867 def test_empty_query_aborts(testing_communities): 

868 with session_scope() as session: 

869 _, token = get_user_id_and_token(session, "user1") 

870 

871 with communities_session(token) as api: 

872 with pytest.raises(grpc.RpcError) as err: 

873 api.SearchCommunities(communities_pb2.SearchCommunitiesReq(query=" ")) 

874 assert err.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

875 assert err.value.details() == "Query must be at least 3 characters long." 

876 

877 @staticmethod 

878 def test_min_length_lt_3_aborts(testing_communities): 

879 """ 

880 len(query) < 3 → return INVALID_ARGUMENT: query_too_short 

881 """ 

882 with session_scope() as session: 

883 _, token = get_user_id_and_token(session, "user1") 

884 

885 with communities_session(token) as api: 

886 with pytest.raises(grpc.RpcError) as err: 

887 api.SearchCommunities(communities_pb2.SearchCommunitiesReq(query="zz", page_size=5)) 

888 assert err.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

889 assert err.value.details() == "Query must be at least 3 characters long." 

890 

891 @staticmethod 

892 def test_typo_matches_existing_name(testing_communities): 

893 """ 

894 Word_similarity should match a simple typo in community name. 

895 """ 

896 with session_scope() as session: 

897 _, token = get_user_id_and_token(session, "user1") 

898 c1_id = get_community_id(session, "Country 1") 

899 

900 with communities_session(token) as api: 

901 res = api.SearchCommunities(communities_pb2.SearchCommunitiesReq(query="Coutri 1", page_size=5)) 

902 ids = [c.community_id for c in res.communities] 

903 assert c1_id in ids 

904 

905 @staticmethod 

906 def test_word_similarity_matches_partial_word(testing_communities): 

907 """ 

908 Query 'city' should match 'Country 1, Region 1, City 1'. 

909 """ 

910 with session_scope() as session: 

911 _, token = get_user_id_and_token(session, "user1") 

912 city1_id = get_community_id(session, "Country 1, Region 1, City 1") # переименовал для ясности 

913 

914 with communities_session(token) as api: 

915 res = api.SearchCommunities(communities_pb2.SearchCommunitiesReq(query="city", page_size=5)) 

916 ids = [c.community_id for c in res.communities] 

917 assert city1_id in ids 

918 

919 @staticmethod 

920 def test_results_sorted_by_similarity(testing_communities): 

921 """ 

922 Results should be ordered by similarity score (best match first). 

923 For query 'Country 1, Region', the full region name should rank higher 

924 than deeper descendants like 'City 1'. 

925 """ 

926 with session_scope() as session: 

927 _, token = get_user_id_and_token(session, "user1") 

928 region_id = get_community_id(session, "Country 1, Region 1") 

929 city_id = get_community_id(session, "Country 1, Region 1, City 1") 

930 

931 with communities_session(token) as api: 

932 res = api.SearchCommunities(communities_pb2.SearchCommunitiesReq(query="Country 1, Region", page_size=5)) 

933 ids = [c.community_id for c in res.communities] 

934 

935 assert region_id in ids 

936 assert city_id in ids 

937 assert ids.index(region_id) < ids.index(city_id) 

938 

939 @staticmethod 

940 def test_no_results_returns_empty(testing_communities): 

941 """ 

942 For a nonsense query that shouldn't meet the similarity threshold, return empty list. 

943 """ 

944 with session_scope() as session: 

945 _, token = get_user_id_and_token(session, "user1") 

946 

947 with communities_session(token) as api: 

948 res = api.SearchCommunities(communities_pb2.SearchCommunitiesReq(query="qwertyuiopasdf", page_size=5)) 

949 assert res.communities == [] 

950 

951 @staticmethod 

952 def test_ListAllCommunities(testing_communities): 

953 """ 

954 Test that ListAllCommunities returns all communities with proper hierarchy information. 

955 """ 

956 with session_scope() as session: 

957 user1_id, token1 = get_user_id_and_token(session, "user1") 

958 user2_id, token2 = get_user_id_and_token(session, "user2") 

959 user6_id, token6 = get_user_id_and_token(session, "user6") 

960 w_id = get_community_id(session, "Global") 

961 c1_id = get_community_id(session, "Country 1") 

962 c1r1_id = get_community_id(session, "Country 1, Region 1") 

963 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") 

964 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

965 c1r2_id = get_community_id(session, "Country 1, Region 2") 

966 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") 

967 c2_id = get_community_id(session, "Country 2") 

968 c2r1_id = get_community_id(session, "Country 2, Region 1") 

969 c2r1c1_id = get_community_id(session, "Country 2, Region 1, City 1") 

970 

971 # Test with user1 who is a member of multiple communities 

972 with communities_session(token1) as api: 

973 res = api.ListAllCommunities(communities_pb2.ListAllCommunitiesReq()) 

974 

975 # Should return all 10 communities 

976 assert len(res.communities) == 10 

977 

978 # Get all community IDs 

979 community_ids = [c.community_id for c in res.communities] 

980 assert set(community_ids) == { 

981 w_id, 

982 c1_id, 

983 c1r1_id, 

984 c1r1c1_id, 

985 c1r1c2_id, 

986 c1r2_id, 

987 c1r2c1_id, 

988 c2_id, 

989 c2r1_id, 

990 c2r1c1_id, 

991 } 

992 

993 # Check that each community has the required fields 

994 for community in res.communities: 

995 assert community.community_id > 0 

996 assert len(community.name) > 0 

997 assert len(community.slug) > 0 

998 assert community.member_count > 0 

999 # member field should be a boolean 

1000 assert isinstance(community.member, bool) 

1001 # parents should be present for hierarchical ordering 

1002 assert len(community.parents) >= 1 

1003 # created timestamp should be present 

1004 assert community.HasField("created") 

1005 assert community.created.seconds > 0 

1006 

1007 # Find specific communities and verify their data 

1008 global_community = next(c for c in res.communities if c.community_id == w_id) 

1009 assert global_community.name == "Global" 

1010 assert global_community.slug == "global" 

1011 assert global_community.member # user1 is a member 

1012 assert global_community.member_count == 8 

1013 assert len(global_community.parents) == 1 # Only itself 

1014 

1015 c1r1c1_community = next(c for c in res.communities if c.community_id == c1r1c1_id) 

1016 assert c1r1c1_community.name == "Country 1, Region 1, City 1" 

1017 assert c1r1c1_community.slug == "country-1-region-1-city-1" 

1018 assert c1r1c1_community.member # user1 is a member 

1019 assert c1r1c1_community.member_count == 3 

1020 assert len(c1r1c1_community.parents) == 4 # Global, Country 1, Region 1, City 1 

1021 # Verify parent hierarchy 

1022 assert c1r1c1_community.parents[0].community.community_id == w_id 

1023 assert c1r1c1_community.parents[1].community.community_id == c1_id 

1024 assert c1r1c1_community.parents[2].community.community_id == c1r1_id 

1025 assert c1r1c1_community.parents[3].community.community_id == c1r1c1_id 

1026 

1027 # Test with user6 who has different community memberships 

1028 with communities_session(token6) as api: 

1029 res = api.ListAllCommunities(communities_pb2.ListAllCommunitiesReq()) 

1030 

1031 # Should still return all 10 communities 

1032 assert len(res.communities) == 10 

1033 

1034 # Find Country 2 community - user6 should be a member 

1035 c2_community = next(c for c in res.communities if c.community_id == c2_id) 

1036 assert c2_community.member # user6 is a member 

1037 assert c2_community.member_count == 2 

1038 

1039 # Find Country 1 - user6 should NOT be a member 

1040 c1_community = next(c for c in res.communities if c.community_id == c1_id) 

1041 assert not c1_community.member # user6 is not a member 

1042 

1043 # Global - user6 should be a member 

1044 global_community = next(c for c in res.communities if c.community_id == w_id) 

1045 assert global_community.member # user6 is a member 

1046 

1047 

1048def test_JoinCommunity_and_LeaveCommunity(testing_communities): 

1049 # these are separate as they mutate the database 

1050 with session_scope() as session: 

1051 # at x=1, inside c1 (country 1) 

1052 user1_id, token1 = get_user_id_and_token(session, "user1") 

1053 # at x=51, not inside c1 

1054 user8_id, token8 = get_user_id_and_token(session, "user8") 

1055 c1_id = get_community_id(session, "Country 1") 

1056 

1057 with communities_session(token1) as api: 

1058 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1059 

1060 # user1 is already part of c1, cannot join 

1061 with pytest.raises(grpc.RpcError) as e: 

1062 res = api.JoinCommunity( 

1063 communities_pb2.JoinCommunityReq( 

1064 community_id=c1_id, 

1065 ) 

1066 ) 

1067 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1068 assert e.value.details() == "You're already in that community." 

1069 

1070 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1071 

1072 # user1 is inside c1, cannot leave 

1073 with pytest.raises(grpc.RpcError) as e: 

1074 res = api.LeaveCommunity( 

1075 communities_pb2.LeaveCommunityReq( 

1076 community_id=c1_id, 

1077 ) 

1078 ) 

1079 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1080 assert ( 

1081 e.value.details() 

1082 == "Your location on your profile is within this community, so you cannot leave it. However, you can adjust your notifications in your account settings." 

1083 ) 

1084 

1085 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1086 

1087 with communities_session(token8) as api: 

1088 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1089 

1090 # user8 is not in c1 yet, cannot leave 

1091 with pytest.raises(grpc.RpcError) as e: 

1092 res = api.LeaveCommunity( 

1093 communities_pb2.LeaveCommunityReq( 

1094 community_id=c1_id, 

1095 ) 

1096 ) 

1097 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1098 assert e.value.details() == "You're not in that community." 

1099 

1100 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1101 

1102 # user8 is not in c1 and not part, can join 

1103 res = api.JoinCommunity( 

1104 communities_pb2.JoinCommunityReq( 

1105 community_id=c1_id, 

1106 ) 

1107 ) 

1108 

1109 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1110 

1111 # user8 is not in c1 and but now part, can't join again 

1112 with pytest.raises(grpc.RpcError) as e: 

1113 res = api.JoinCommunity( 

1114 communities_pb2.JoinCommunityReq( 

1115 community_id=c1_id, 

1116 ) 

1117 ) 

1118 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1119 assert e.value.details() == "You're already in that community." 

1120 

1121 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1122 

1123 # user8 is not in c1 yet, but part of it, can leave 

1124 res = api.LeaveCommunity( 

1125 communities_pb2.LeaveCommunityReq( 

1126 community_id=c1_id, 

1127 ) 

1128 ) 

1129 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1130 

1131 

1132def test_LeaveCommunity_regression(db): 

1133 # See github issue #1444, repro: 

1134 # 1. Join more than one community 

1135 # 2. Leave one of them 

1136 # 3. You are no longer in any community 

1137 # admin 

1138 user1, token1 = generate_user(username="user1", geom=create_1d_point(200), geom_radius=0.1) 

1139 # joiner/leaver 

1140 user2, token2 = generate_user(username="user2", geom=create_1d_point(201), geom_radius=0.1) 

1141 

1142 with session_scope() as session: 

1143 c0 = create_community(session, 0, 100, "Community 0", [user1], [], None) 

1144 c1 = create_community(session, 0, 50, "Community 1", [user1], [], c0) 

1145 c2 = create_community(session, 0, 10, "Community 2", [user1], [], c0) 

1146 c0_id = c0.id 

1147 c1_id = c1.id 

1148 c2_id = c2.id 

1149 

1150 enforce_community_memberships() 

1151 

1152 with communities_session(token1) as api: 

1153 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c0_id)).member 

1154 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1155 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c2_id)).member 

1156 

1157 with communities_session(token2) as api: 

1158 # first check we're not in any communities 

1159 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c0_id)).member 

1160 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1161 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c2_id)).member 

1162 

1163 # join some communities 

1164 api.JoinCommunity(communities_pb2.JoinCommunityReq(community_id=c1_id)) 

1165 api.JoinCommunity(communities_pb2.JoinCommunityReq(community_id=c2_id)) 

1166 

1167 # check memberships 

1168 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c0_id)).member 

1169 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1170 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c2_id)).member 

1171 

1172 # leave just c2 

1173 api.LeaveCommunity(communities_pb2.LeaveCommunityReq(community_id=c2_id)) 

1174 

1175 # check memberships 

1176 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c0_id)).member 

1177 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1178 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c2_id)).member 

1179 

1180 

1181def test_enforce_community_memberships_for_user(testing_communities): 

1182 """ 

1183 Make sure the user is added to the right communities on signup 

1184 """ 

1185 with auth_api_session() as (auth_api, metadata_interceptor): 

1186 res = auth_api.SignupFlow( 

1187 auth_pb2.SignupFlowReq( 

1188 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

1189 account=auth_pb2.SignupAccount( 

1190 username="frodo", 

1191 password="a very insecure password", 

1192 birthdate="1970-01-01", 

1193 gender="Bot", 

1194 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1195 city="Country 1, Region 1, City 2", 

1196 # lat=8, lng=1 is equivalent to creating this coordinate with create_coordinate(8) 

1197 lat=8, 

1198 lng=1, 

1199 radius=500, 

1200 accept_tos=True, 

1201 ), 

1202 feedback=auth_pb2.ContributorForm(), 

1203 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1204 ) 

1205 ) 

1206 with session_scope() as session: 

1207 email_token = ( 

1208 session.execute(select(SignupFlow).where(SignupFlow.flow_token == res.flow_token)).scalar_one().email_token 

1209 ) 

1210 with auth_api_session() as (auth_api, metadata_interceptor): 

1211 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1212 user_id = res.auth_res.user_id 

1213 

1214 # now check the user is in the right communities 

1215 with session_scope() as session: 

1216 w_id = get_community_id(session, "Global") 

1217 c1_id = get_community_id(session, "Country 1") 

1218 c1r1_id = get_community_id(session, "Country 1, Region 1") 

1219 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

1220 

1221 token, _ = get_session_cookie_tokens(metadata_interceptor) 

1222 

1223 with communities_session(token) as api: 

1224 res = api.ListUserCommunities(communities_pb2.ListUserCommunitiesReq()) 

1225 assert [c.community_id for c in res.communities] == [w_id, c1_id, c1r1_id, c1r1c2_id] 

1226 

1227 

1228# TODO: requires transferring of content 

1229 

1230# def test_ListPlaces(db, testing_communities): 

1231# pass 

1232 

1233# def test_ListGuides(db, testing_communities): 

1234# pass 

1235 

1236# def test_ListEvents(db, testing_communities): 

1237# pass