Coverage for app / backend / src / tests / test_communities.py: 100%

684 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 14:14 +0000

1from datetime import timedelta 

2 

3import grpc 

4import pytest 

5from geoalchemy2 import WKBElement 

6from google.protobuf import empty_pb2, wrappers_pb2 

7from sqlalchemy import select 

8from sqlalchemy.orm import Session 

9 

10from couchers.db import is_user_in_node_geography, session_scope 

11from couchers.helpers.clusters import CHILD_NODE_TYPE 

12from couchers.materialized_views import refresh_materialized_views 

13from couchers.models import ( 

14 Cluster, 

15 ClusterRole, 

16 ClusterSubscription, 

17 EventOccurrence, 

18 Node, 

19 Page, 

20 PageType, 

21 PageVersion, 

22 SignupFlow, 

23 Thread, 

24 User, 

25) 

26from couchers.proto import api_pb2, auth_pb2, communities_pb2, discussions_pb2, events_pb2, pages_pb2 

27from couchers.tasks import enforce_community_memberships 

28from couchers.utils import Timestamp_from_datetime, create_coordinate, create_polygon_lat_lng, now, to_multi 

29from tests.fixtures.db import generate_user, get_user_id_and_token 

30from tests.fixtures.misc import Moderator 

31from tests.fixtures.sessions import ( 

32 auth_api_session, 

33 communities_session, 

34 discussions_session, 

35 events_session, 

36 pages_session, 

37) 

38from tests.test_auth import get_session_cookie_tokens 

39 

40 

41@pytest.fixture(autouse=True) 

42def _(testconfig): 

43 pass 

44 

45 

46# For testing purposes, restrict ourselves to a 1D-world, consisting of "intervals" that have width 2, and coordinates 

47# that are points at (x, 1). 

48# we'll stick to EPSG4326, even though it's not ideal, so don't use too large values, but it's around the equator, so 

49# mostly fine 

50 

51 

52def create_1d_polygon(lb: int, ub: int) -> WKBElement: 

53 # given a lower bound and upper bound on x, creates the given interval 

54 return create_polygon_lat_lng([[lb, 0], [lb, 2], [ub, 2], [ub, 0], [lb, 0]]) 

55 

56 

57def create_1d_point(x: int) -> WKBElement: 

58 return create_coordinate(x, 1) 

59 

60 

61def create_community( 

62 session: Session, 

63 interval_lb: int, 

64 interval_ub: int, 

65 name: str, 

66 admins: list[User], 

67 extra_members: list[User], 

68 parent: Node | None, 

69) -> Node: 

70 node_type = CHILD_NODE_TYPE[parent.node_type if parent else None] 

71 node = Node( 

72 geom=to_multi(create_1d_polygon(interval_lb, interval_ub)), 

73 parent_node_id=parent.id if parent else None, 

74 node_type=node_type, 

75 ) 

76 session.add(node) 

77 session.flush() 

78 cluster = Cluster( 

79 name=f"{name}", 

80 description=f"Description for {name}", 

81 parent_node_id=node.id, 

82 is_official_cluster=True, 

83 ) 

84 session.add(cluster) 

85 session.flush() 

86 thread = Thread() 

87 session.add(thread) 

88 session.flush() 

89 main_page = Page( 

90 parent_node_id=cluster.parent_node_id, 

91 creator_user_id=admins[0].id, 

92 owner_cluster_id=cluster.id, 

93 type=PageType.main_page, 

94 thread_id=thread.id, 

95 ) 

96 session.add(main_page) 

97 session.flush() 

98 page_version = PageVersion( 

99 page_id=main_page.id, 

100 editor_user_id=admins[0].id, 

101 title=f"Main page for the {name} community", 

102 content="There is nothing here yet...", 

103 ) 

104 session.add(page_version) 

105 for admin in admins: 

106 cluster.cluster_subscriptions.append( 

107 ClusterSubscription( 

108 user_id=admin.id, 

109 cluster_id=cluster.id, 

110 role=ClusterRole.admin, 

111 ) 

112 ) 

113 for member in extra_members: 

114 cluster.cluster_subscriptions.append( 

115 ClusterSubscription( 

116 user_id=member.id, 

117 cluster_id=cluster.id, 

118 role=ClusterRole.member, 

119 ) 

120 ) 

121 session.commit() 

122 # other members will be added by enforce_community_memberships() 

123 return node 

124 

125 

126def create_group( 

127 session: Session, name: str, admins: list[User], members: list[User], parent_community: Node | None 

128) -> Cluster: 

129 assert parent_community is not None 

130 cluster = Cluster( 

131 name=f"{name}", 

132 description=f"Description for {name}", 

133 parent_node_id=parent_community.id, 

134 ) 

135 session.add(cluster) 

136 session.flush() 

137 thread = Thread() 

138 session.add(thread) 

139 session.flush() 

140 main_page = Page( 

141 parent_node_id=cluster.parent_node_id, 

142 creator_user_id=admins[0].id, 

143 owner_cluster_id=cluster.id, 

144 type=PageType.main_page, 

145 thread_id=thread.id, 

146 ) 

147 session.add(main_page) 

148 session.flush() 

149 page_version = PageVersion( 

150 page_id=main_page.id, 

151 editor_user_id=admins[0].id, 

152 title=f"Main page for the {name} community", 

153 content="There is nothing here yet...", 

154 ) 

155 session.add(page_version) 

156 for admin in admins: 

157 cluster.cluster_subscriptions.append( 

158 ClusterSubscription( 

159 user_id=admin.id, 

160 cluster_id=cluster.id, 

161 role=ClusterRole.admin, 

162 ) 

163 ) 

164 for member in members: 

165 cluster.cluster_subscriptions.append( 

166 ClusterSubscription( 

167 user_id=member.id, 

168 cluster_id=cluster.id, 

169 role=ClusterRole.member, 

170 ) 

171 ) 

172 session.commit() 

173 return cluster 

174 

175 

176def create_place(token: str, title: str, content: str, address: str, x: float) -> None: 

177 with pages_session(token) as api: 

178 api.CreatePlace( 

179 pages_pb2.CreatePlaceReq( 

180 title=title, 

181 content=content, 

182 address=address, 

183 location=pages_pb2.Coordinate( 

184 lat=x, 

185 lng=1, 

186 ), 

187 ) 

188 ) 

189 

190 

191def create_discussion(token: str, community_id: int | None, group_id: int | None, title: str, content: str) -> None: 

192 # set group_id or community_id to None 

193 with discussions_session(token) as api: 

194 api.CreateDiscussion( 

195 discussions_pb2.CreateDiscussionReq( 

196 title=title, 

197 content=content, 

198 owner_community_id=community_id, 

199 owner_group_id=group_id, 

200 ) 

201 ) 

202 

203 

204def create_event( 

205 token: str, community_id: int | None, group_id: int | None, title: str, content: str, start_td: timedelta 

206) -> None: 

207 with events_session(token) as api: 

208 res = api.CreateEvent( 

209 events_pb2.CreateEventReq( 

210 title=title, 

211 content=content, 

212 offline_information=events_pb2.OfflineEventInformation( 

213 address="Near Null Island", 

214 lat=0.1, 

215 lng=0.2, 

216 ), 

217 start_time=Timestamp_from_datetime(now() + start_td), 

218 end_time=Timestamp_from_datetime(now() + start_td + timedelta(hours=2)), 

219 timezone="UTC", 

220 ) 

221 ) 

222 api.TransferEvent( 

223 events_pb2.TransferEventReq( 

224 event_id=res.event_id, 

225 new_owner_community_id=community_id, 

226 new_owner_group_id=group_id, 

227 ) 

228 ) 

229 

230 

231def get_community_id(session: Session, community_name: str) -> int: 

232 return session.execute( 

233 select(Cluster.parent_node_id).where(Cluster.is_official_cluster).where(Cluster.name == community_name) 

234 ).scalar_one() 

235 

236 

237def get_group_id(session: Session, group_name: str) -> int: 

238 return session.execute( 

239 select(Cluster.id).where(~Cluster.is_official_cluster).where(Cluster.name == group_name) 

240 ).scalar_one() 

241 

242 

243@pytest.fixture(scope="class") 

244def testing_communities(db_class, testconfig): 

245 user1, token1 = generate_user(username="user1", geom=create_1d_point(1), geom_radius=0.1) 

246 user2, token2 = generate_user(username="user2", geom=create_1d_point(2), geom_radius=0.1) 

247 user3, token3 = generate_user(username="user3", geom=create_1d_point(3), geom_radius=0.1) 

248 user4, token4 = generate_user(username="user4", geom=create_1d_point(8), geom_radius=0.1) 

249 user5, token5 = generate_user(username="user5", geom=create_1d_point(6), geom_radius=0.1) 

250 user6, token6 = generate_user(username="user6", geom=create_1d_point(65), geom_radius=0.1) 

251 user7, token7 = generate_user(username="user7", geom=create_1d_point(80), geom_radius=0.1) 

252 user8, token8 = generate_user(username="user8", geom=create_1d_point(51), geom_radius=0.1) 

253 

254 with session_scope() as session: 

255 w = create_community(session, 0, 100, "Global", [user1, user3, user7], [], None) 

256 c2 = create_community(session, 52, 100, "Country 2", [user6, user7], [], w) 

257 c2r1 = create_community(session, 52, 71, "Country 2, Region 1", [user6], [user8], c2) 

258 c2r1c1 = create_community(session, 53, 70, "Country 2, Region 1, City 1", [user8], [], c2r1) 

259 c1 = create_community(session, 0, 50, "Country 1", [user1, user2], [], w) 

260 c1r1 = create_community(session, 0, 10, "Country 1, Region 1", [user1, user2], [], c1) 

261 c1r1c1 = create_community(session, 0, 5, "Country 1, Region 1, City 1", [user2], [], c1r1) 

262 c1r1c2 = create_community(session, 7, 10, "Country 1, Region 1, City 2", [user4, user5], [user2], c1r1) 

263 c1r2 = create_community(session, 20, 25, "Country 1, Region 2", [user2], [], c1) 

264 c1r2c1 = create_community(session, 21, 23, "Country 1, Region 2, City 1", [user2], [], c1r2) 

265 

266 h = create_group(session, "Hitchhikers", [user1, user2], [user5, user8], w) 

267 create_group(session, "Country 1, Region 1, Foodies", [user1], [user2, user4], c1r1) 

268 create_group(session, "Country 1, Region 1, Skaters", [user2], [user1], c1r1) 

269 create_group(session, "Country 1, Region 2, Foodies", [user2], [user4, user5], c1r2) 

270 create_group(session, "Country 2, Region 1, Foodies", [user6], [user7], c2r1) 

271 

272 w_id = w.id 

273 c1r1c2_id = c1r1c2.id 

274 h_id = h.id 

275 c1_id = c1.id 

276 

277 create_discussion(token1, w_id, None, "Discussion title 1", "Discussion content 1") 

278 create_discussion(token3, w_id, None, "Discussion title 2", "Discussion content 2") 

279 create_discussion(token3, w_id, None, "Discussion title 3", "Discussion content 3") 

280 create_discussion(token3, w_id, None, "Discussion title 4", "Discussion content 4") 

281 create_discussion(token3, w_id, None, "Discussion title 5", "Discussion content 5") 

282 create_discussion(token3, w_id, None, "Discussion title 6", "Discussion content 6") 

283 create_discussion(token4, c1r1c2_id, None, "Discussion title 7", "Discussion content 7") 

284 create_discussion(token5, None, h_id, "Discussion title 8", "Discussion content 8") 

285 create_discussion(token1, None, h_id, "Discussion title 9", "Discussion content 9") 

286 create_discussion(token2, None, h_id, "Discussion title 10", "Discussion content 10") 

287 create_discussion(token3, None, h_id, "Discussion title 11", "Discussion content 11") 

288 create_discussion(token4, None, h_id, "Discussion title 12", "Discussion content 12") 

289 create_discussion(token5, None, h_id, "Discussion title 13", "Discussion content 13") 

290 create_discussion(token8, None, h_id, "Discussion title 14", "Discussion content 14") 

291 

292 create_event(token3, c1_id, None, "Event title 1", "Event content 1", timedelta(hours=1)) 

293 create_event(token1, c1_id, None, "Event title 2", "Event content 2", timedelta(hours=2)) 

294 create_event(token3, c1_id, None, "Event title 3", "Event content 3", timedelta(hours=3)) 

295 create_event(token1, c1_id, None, "Event title 4", "Event content 4", timedelta(hours=4)) 

296 create_event(token3, c1_id, None, "Event title 5", "Event content 5", timedelta(hours=5)) 

297 create_event(token1, c1_id, None, "Event title 6", "Event content 6", timedelta(hours=6)) 

298 create_event(token2, None, h_id, "Event title 7", "Event content 7", timedelta(hours=7)) 

299 create_event(token2, None, h_id, "Event title 8", "Event content 8", timedelta(hours=8)) 

300 create_event(token2, None, h_id, "Event title 9", "Event content 9", timedelta(hours=9)) 

301 create_event(token2, None, h_id, "Event title 10", "Event content 10", timedelta(hours=10)) 

302 create_event(token2, None, h_id, "Event title 11", "Event content 11", timedelta(hours=11)) 

303 create_event(token2, None, h_id, "Event title 12", "Event content 12", timedelta(hours=12)) 

304 

305 # Approve all events for visibility (UMS starts events as SHADOWED) 

306 mod_user, mod_token = generate_user(is_superuser=True) 

307 mod = Moderator(mod_user, mod_token) 

308 with session_scope() as session: 

309 occurrence_ids = session.execute(select(EventOccurrence.id)).scalars().all() 

310 for oid in occurrence_ids: 

311 mod.approve_event_occurrence(oid) 

312 

313 enforce_community_memberships() 

314 

315 create_place(token1, "Country 1, Region 1, Attraction", "Place content", "Somewhere in c1r1", 6) 

316 create_place(token2, "Country 1, Region 1, City 1, Attraction 1", "Place content", "Somewhere in c1r1c1", 3) 

317 create_place(token2, "Country 1, Region 1, City 1, Attraction 2", "Place content", "Somewhere in c1r1c1", 4) 

318 create_place(token8, "Global, Attraction", "Place content", "Somewhere in w", 51.5) 

319 create_place(token6, "Country 2, Region 1, Attraction", "Place content", "Somewhere in c2r1", 59) 

320 

321 refresh_materialized_views(empty_pb2.Empty()) 

322 

323 yield 

324 

325 

326class TestCommunities: 

327 @staticmethod 

328 def test_GetCommunity(testing_communities): 

329 with session_scope() as session: 

330 user1_id, token1 = get_user_id_and_token(session, "user1") 

331 user2_id, token2 = get_user_id_and_token(session, "user2") 

332 user6_id, token6 = get_user_id_and_token(session, "user6") 

333 w_id = get_community_id(session, "Global") 

334 c1_id = get_community_id(session, "Country 1") 

335 c1r1_id = get_community_id(session, "Country 1, Region 1") 

336 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") 

337 c2_id = get_community_id(session, "Country 2") 

338 

339 with communities_session(token2) as api: 

340 res = api.GetCommunity( 

341 communities_pb2.GetCommunityReq( 

342 community_id=w_id, 

343 ) 

344 ) 

345 assert res.name == "Global" 

346 assert res.slug == "global" 

347 assert res.description == "Description for Global" 

348 assert len(res.parents) == 1 

349 assert res.parents[0].HasField("community") 

350 assert res.parents[0].community.community_id == w_id 

351 assert res.parents[0].community.name == "Global" 

352 assert res.parents[0].community.slug == "global" 

353 assert res.parents[0].community.description == "Description for Global" 

354 assert res.main_page.type == pages_pb2.PAGE_TYPE_MAIN_PAGE 

355 assert res.main_page.slug == "main-page-for-the-global-community" 

356 assert res.main_page.last_editor_user_id == user1_id 

357 assert res.main_page.creator_user_id == user1_id 

358 assert res.main_page.owner_community_id == w_id 

359 assert res.main_page.title == "Main page for the Global community" 

360 assert res.main_page.content == "There is nothing here yet..." 

361 assert not res.main_page.can_edit 

362 assert not res.main_page.can_moderate 

363 assert res.main_page.editor_user_ids == [user1_id] 

364 assert res.member 

365 assert not res.admin 

366 assert res.member_count == 8 

367 assert res.admin_count == 3 

368 

369 res = api.GetCommunity( 

370 communities_pb2.GetCommunityReq( 

371 community_id=c1r1c1_id, 

372 ) 

373 ) 

374 assert res.community_id == c1r1c1_id 

375 assert res.name == "Country 1, Region 1, City 1" 

376 assert res.slug == "country-1-region-1-city-1" 

377 assert res.description == "Description for Country 1, Region 1, City 1" 

378 assert len(res.parents) == 4 

379 assert res.parents[0].HasField("community") 

380 assert res.parents[0].community.community_id == w_id 

381 assert res.parents[0].community.name == "Global" 

382 assert res.parents[0].community.slug == "global" 

383 assert res.parents[0].community.description == "Description for Global" 

384 assert res.parents[1].HasField("community") 

385 assert res.parents[1].community.community_id == c1_id 

386 assert res.parents[1].community.name == "Country 1" 

387 assert res.parents[1].community.slug == "country-1" 

388 assert res.parents[1].community.description == "Description for Country 1" 

389 assert res.parents[2].HasField("community") 

390 assert res.parents[2].community.community_id == c1r1_id 

391 assert res.parents[2].community.name == "Country 1, Region 1" 

392 assert res.parents[2].community.slug == "country-1-region-1" 

393 assert res.parents[2].community.description == "Description for Country 1, Region 1" 

394 assert res.parents[3].HasField("community") 

395 assert res.parents[3].community.community_id == c1r1c1_id 

396 assert res.parents[3].community.name == "Country 1, Region 1, City 1" 

397 assert res.parents[3].community.slug == "country-1-region-1-city-1" 

398 assert res.parents[3].community.description == "Description for Country 1, Region 1, City 1" 

399 assert res.main_page.type == pages_pb2.PAGE_TYPE_MAIN_PAGE 

400 assert res.main_page.slug == "main-page-for-the-country-1-region-1-city-1-community" 

401 assert res.main_page.last_editor_user_id == user2_id 

402 assert res.main_page.creator_user_id == user2_id 

403 assert res.main_page.owner_community_id == c1r1c1_id 

404 assert res.main_page.title == "Main page for the Country 1, Region 1, City 1 community" 

405 assert res.main_page.content == "There is nothing here yet..." 

406 assert res.main_page.can_edit 

407 assert res.main_page.can_moderate 

408 assert res.main_page.editor_user_ids == [user2_id] 

409 assert res.member 

410 assert res.admin 

411 assert res.member_count == 3 

412 assert res.admin_count == 1 

413 

414 res = api.GetCommunity( 

415 communities_pb2.GetCommunityReq( 

416 community_id=c2_id, 

417 ) 

418 ) 

419 assert res.community_id == c2_id 

420 assert res.name == "Country 2" 

421 assert res.slug == "country-2" 

422 assert res.description == "Description for Country 2" 

423 assert len(res.parents) == 2 

424 assert res.parents[0].HasField("community") 

425 assert res.parents[0].community.community_id == w_id 

426 assert res.parents[0].community.name == "Global" 

427 assert res.parents[0].community.slug == "global" 

428 assert res.parents[0].community.description == "Description for Global" 

429 assert res.parents[1].HasField("community") 

430 assert res.parents[1].community.community_id == c2_id 

431 assert res.parents[1].community.name == "Country 2" 

432 assert res.parents[1].community.slug == "country-2" 

433 assert res.parents[1].community.description == "Description for Country 2" 

434 assert res.main_page.type == pages_pb2.PAGE_TYPE_MAIN_PAGE 

435 assert res.main_page.slug == "main-page-for-the-country-2-community" 

436 assert res.main_page.last_editor_user_id == user6_id 

437 assert res.main_page.creator_user_id == user6_id 

438 assert res.main_page.owner_community_id == c2_id 

439 assert res.main_page.title == "Main page for the Country 2 community" 

440 assert res.main_page.content == "There is nothing here yet..." 

441 assert not res.main_page.can_edit 

442 assert not res.main_page.can_moderate 

443 assert res.main_page.editor_user_ids == [user6_id] 

444 assert not res.member 

445 assert not res.admin 

446 assert res.member_count == 2 

447 assert res.admin_count == 2 

448 

449 @staticmethod 

450 def test_ListCommunities(testing_communities): 

451 with session_scope() as session: 

452 user1_id, token1 = get_user_id_and_token(session, "user1") 

453 c1_id = get_community_id(session, "Country 1") 

454 c1r1_id = get_community_id(session, "Country 1, Region 1") 

455 c1r2_id = get_community_id(session, "Country 1, Region 2") 

456 

457 with communities_session(token1) as api: 

458 res = api.ListCommunities( 

459 communities_pb2.ListCommunitiesReq( 

460 community_id=c1_id, 

461 ) 

462 ) 

463 assert [c.community_id for c in res.communities] == [c1r1_id, c1r2_id] 

464 

465 @staticmethod 

466 def test_ListCommunities_all(testing_communities): 

467 with session_scope() as session: 

468 user1_id, token1 = get_user_id_and_token(session, "user1") 

469 w_id = get_community_id(session, "Global") 

470 c1_id = get_community_id(session, "Country 1") 

471 c1r1_id = get_community_id(session, "Country 1, Region 1") 

472 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") 

473 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

474 c1r2_id = get_community_id(session, "Country 1, Region 2") 

475 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") 

476 c2_id = get_community_id(session, "Country 2") 

477 c2r1_id = get_community_id(session, "Country 2, Region 1") 

478 c2r1c1_id = get_community_id(session, "Country 2, Region 1, City 1") 

479 

480 # Fetch all communities ordered by name 

481 with communities_session(token1) as api: 

482 res = api.ListCommunities( 

483 communities_pb2.ListCommunitiesReq( 

484 page_size=5, 

485 ) 

486 ) 

487 assert [c.community_id for c in res.communities] == [c1_id, c1r1_id, c1r1c1_id, c1r1c2_id, c1r2_id] 

488 res = api.ListCommunities( 

489 communities_pb2.ListCommunitiesReq( 

490 page_size=2, 

491 page_token=res.next_page_token, 

492 ) 

493 ) 

494 assert [c.community_id for c in res.communities] == [c1r2c1_id, c2_id] 

495 res = api.ListCommunities( 

496 communities_pb2.ListCommunitiesReq( 

497 page_size=5, 

498 page_token=res.next_page_token, 

499 ) 

500 ) 

501 assert [c.community_id for c in res.communities] == [c2r1_id, c2r1c1_id, w_id] 

502 

503 @staticmethod 

504 def test_ListUserCommunities(testing_communities): 

505 with session_scope() as session: 

506 user2_id, token2 = get_user_id_and_token(session, "user2") 

507 w_id = get_community_id(session, "Global") 

508 c1_id = get_community_id(session, "Country 1") 

509 c1r1_id = get_community_id(session, "Country 1, Region 1") 

510 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") 

511 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

512 c1r2_id = get_community_id(session, "Country 1, Region 2") 

513 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") 

514 

515 # Fetch user2's communities from user2's account 

516 with communities_session(token2) as api: 

517 res = api.ListUserCommunities(communities_pb2.ListUserCommunitiesReq()) 

518 assert [c.community_id for c in res.communities] == [ 

519 w_id, 

520 c1_id, 

521 c1r1_id, 

522 c1r1c1_id, 

523 c1r1c2_id, 

524 c1r2_id, 

525 c1r2c1_id, 

526 ] 

527 

528 @staticmethod 

529 def test_ListOtherUserCommunities(testing_communities): 

530 with session_scope() as session: 

531 user1_id, token1 = get_user_id_and_token(session, "user1") 

532 user2_id, token2 = get_user_id_and_token(session, "user2") 

533 w_id = get_community_id(session, "Global") 

534 c1_id = get_community_id(session, "Country 1") 

535 c1r1_id = get_community_id(session, "Country 1, Region 1") 

536 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") 

537 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

538 c1r2_id = get_community_id(session, "Country 1, Region 2") 

539 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") 

540 

541 # Fetch user2's communities from user1's account 

542 with communities_session(token1) as api: 

543 res = api.ListUserCommunities(communities_pb2.ListUserCommunitiesReq(user_id=user2_id)) 

544 assert [c.community_id for c in res.communities] == [ 

545 w_id, 

546 c1_id, 

547 c1r1_id, 

548 c1r1c1_id, 

549 c1r1c2_id, 

550 c1r2_id, 

551 c1r2c1_id, 

552 ] 

553 

554 @staticmethod 

555 def test_ListGroups(testing_communities): 

556 with session_scope() as session: 

557 user1_id, token1 = get_user_id_and_token(session, "user1") 

558 user5_id, token5 = get_user_id_and_token(session, "user5") 

559 w_id = get_community_id(session, "Global") 

560 hitchhikers_id = get_group_id(session, "Hitchhikers") 

561 c1r1_id = get_community_id(session, "Country 1, Region 1") 

562 foodies_id = get_group_id(session, "Country 1, Region 1, Foodies") 

563 skaters_id = get_group_id(session, "Country 1, Region 1, Skaters") 

564 

565 with communities_session(token1) as api: 

566 res = api.ListGroups( 

567 communities_pb2.ListGroupsReq( 

568 community_id=c1r1_id, 

569 ) 

570 ) 

571 assert [g.group_id for g in res.groups] == [foodies_id, skaters_id] 

572 

573 with communities_session(token5) as api: 

574 res = api.ListGroups( 

575 communities_pb2.ListGroupsReq( 

576 community_id=w_id, 

577 ) 

578 ) 

579 assert len(res.groups) == 1 

580 assert res.groups[0].group_id == hitchhikers_id 

581 

582 @staticmethod 

583 def test_ListAdmins(testing_communities): 

584 with session_scope() as session: 

585 user1_id, token1 = get_user_id_and_token(session, "user1") 

586 user3_id, token3 = get_user_id_and_token(session, "user3") 

587 user4_id, token4 = get_user_id_and_token(session, "user4") 

588 user5_id, token5 = get_user_id_and_token(session, "user5") 

589 user7_id, token7 = get_user_id_and_token(session, "user7") 

590 w_id = get_community_id(session, "Global") 

591 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

592 

593 with communities_session(token1) as api: 

594 res = api.ListAdmins( 

595 communities_pb2.ListAdminsReq( 

596 community_id=w_id, 

597 ) 

598 ) 

599 assert res.admin_user_ids == [user1_id, user3_id, user7_id] 

600 

601 res = api.ListAdmins( 

602 communities_pb2.ListAdminsReq( 

603 community_id=c1r1c2_id, 

604 ) 

605 ) 

606 assert res.admin_user_ids == [user4_id, user5_id] 

607 

608 @staticmethod 

609 def test_AddAdmin(testing_communities): 

610 with session_scope() as session: 

611 user4_id, token4 = get_user_id_and_token(session, "user4") 

612 user5_id, _ = get_user_id_and_token(session, "user5") 

613 user2_id, _ = get_user_id_and_token(session, "user2") 

614 user8_id, token8 = get_user_id_and_token(session, "user8") 

615 node_id = get_community_id(session, "Country 1, Region 1, City 2") 

616 

617 with communities_session(token8) as api: 

618 with pytest.raises(grpc.RpcError) as err: 

619 api.AddAdmin(communities_pb2.AddAdminReq(community_id=node_id, user_id=user2_id)) 

620 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

621 assert err.value.details() == "You're not allowed to moderate that community" 

622 

623 with communities_session(token4) as api: 

624 res = api.ListAdmins(communities_pb2.ListAdminsReq(community_id=node_id)) 

625 assert res.admin_user_ids == [user4_id, user5_id] 

626 

627 with pytest.raises(grpc.RpcError) as err: 

628 api.AddAdmin(communities_pb2.AddAdminReq(community_id=node_id, user_id=user8_id)) 

629 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

630 assert err.value.details() == "That user is not in the community." 

631 

632 with pytest.raises(grpc.RpcError) as err: 

633 api.AddAdmin(communities_pb2.AddAdminReq(community_id=node_id, user_id=user5_id)) 

634 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

635 assert err.value.details() == "That user is already an admin." 

636 

637 api.AddAdmin(communities_pb2.AddAdminReq(community_id=node_id, user_id=user2_id)) 

638 res = api.ListAdmins(communities_pb2.ListAdminsReq(community_id=node_id)) 

639 assert res.admin_user_ids == [user2_id, user4_id, user5_id] 

640 # Cleanup because database changes do not roll back 

641 api.RemoveAdmin(communities_pb2.RemoveAdminReq(community_id=node_id, user_id=user2_id)) 

642 

643 @staticmethod 

644 def test_RemoveAdmin(testing_communities): 

645 with session_scope() as session: 

646 user4_id, token4 = get_user_id_and_token(session, "user4") 

647 user5_id, _ = get_user_id_and_token(session, "user5") 

648 user2_id, _ = get_user_id_and_token(session, "user2") 

649 user8_id, token8 = get_user_id_and_token(session, "user8") 

650 node_id = get_community_id(session, "Country 1, Region 1, City 2") 

651 

652 with communities_session(token8) as api: 

653 with pytest.raises(grpc.RpcError) as err: 

654 api.AddAdmin(communities_pb2.AddAdminReq(community_id=node_id, user_id=user2_id)) 

655 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

656 assert err.value.details() == "You're not allowed to moderate that community" 

657 

658 with communities_session(token4) as api: 

659 res = api.ListAdmins(communities_pb2.ListAdminsReq(community_id=node_id)) 

660 assert res.admin_user_ids == [user4_id, user5_id] 

661 

662 with pytest.raises(grpc.RpcError) as err: 

663 api.RemoveAdmin(communities_pb2.RemoveAdminReq(community_id=node_id, user_id=user8_id)) 

664 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

665 assert err.value.details() == "That user is not in the community." 

666 

667 with pytest.raises(grpc.RpcError) as err: 

668 api.RemoveAdmin(communities_pb2.RemoveAdminReq(community_id=node_id, user_id=user2_id)) 

669 assert err.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

670 assert err.value.details() == "That user is not an admin." 

671 

672 api.RemoveAdmin(communities_pb2.RemoveAdminReq(community_id=node_id, user_id=user5_id)) 

673 res = api.ListAdmins(communities_pb2.ListAdminsReq(community_id=node_id)) 

674 assert res.admin_user_ids == [user4_id] 

675 # Cleanup because database changes do not roll back 

676 api.AddAdmin(communities_pb2.AddAdminReq(community_id=node_id, user_id=user5_id)) 

677 

678 @staticmethod 

679 def test_ListMembers(testing_communities): 

680 with session_scope() as session: 

681 user1_id, token1 = get_user_id_and_token(session, "user1") 

682 user2_id, token2 = get_user_id_and_token(session, "user2") 

683 user3_id, token3 = get_user_id_and_token(session, "user3") 

684 user4_id, token4 = get_user_id_and_token(session, "user4") 

685 user5_id, token5 = get_user_id_and_token(session, "user5") 

686 user6_id, token6 = get_user_id_and_token(session, "user6") 

687 user7_id, token7 = get_user_id_and_token(session, "user7") 

688 user8_id, token8 = get_user_id_and_token(session, "user8") 

689 w_id = get_community_id(session, "Global") 

690 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

691 

692 with communities_session(token1) as api: 

693 res = api.ListMembers( 

694 communities_pb2.ListMembersReq( 

695 community_id=w_id, 

696 ) 

697 ) 

698 assert res.member_user_ids == [ 

699 user8_id, 

700 user7_id, 

701 user6_id, 

702 user5_id, 

703 user4_id, 

704 user3_id, 

705 user2_id, 

706 user1_id, 

707 ] 

708 

709 res = api.ListMembers( 

710 communities_pb2.ListMembersReq( 

711 community_id=c1r1c2_id, 

712 ) 

713 ) 

714 assert res.member_user_ids == [user5_id, user4_id, user2_id] 

715 

716 @staticmethod 

717 def test_ListNearbyUsers(testing_communities): 

718 with session_scope() as session: 

719 user1_id, token1 = get_user_id_and_token(session, "user1") 

720 user2_id, token2 = get_user_id_and_token(session, "user2") 

721 user3_id, token3 = get_user_id_and_token(session, "user3") 

722 user4_id, token4 = get_user_id_and_token(session, "user4") 

723 user5_id, token5 = get_user_id_and_token(session, "user5") 

724 user6_id, token6 = get_user_id_and_token(session, "user6") 

725 user7_id, token7 = get_user_id_and_token(session, "user7") 

726 user8_id, token8 = get_user_id_and_token(session, "user8") 

727 w_id = get_community_id(session, "Global") 

728 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

729 

730 with communities_session(token1) as api: 

731 res = api.ListNearbyUsers( 

732 communities_pb2.ListNearbyUsersReq( 

733 community_id=w_id, 

734 ) 

735 ) 

736 assert res.nearby_user_ids == [ 

737 user1_id, 

738 user2_id, 

739 user3_id, 

740 user4_id, 

741 user5_id, 

742 user6_id, 

743 user7_id, 

744 user8_id, 

745 ] 

746 

747 res = api.ListNearbyUsers( 

748 communities_pb2.ListNearbyUsersReq( 

749 community_id=c1r1c2_id, 

750 ) 

751 ) 

752 assert res.nearby_user_ids == [user4_id] 

753 

754 @staticmethod 

755 def test_ListDiscussions(testing_communities): 

756 with session_scope() as session: 

757 user1_id, token1 = get_user_id_and_token(session, "user1") 

758 w_id = get_community_id(session, "Global") 

759 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

760 

761 with communities_session(token1) as api: 

762 res = api.ListDiscussions( 

763 communities_pb2.ListDiscussionsReq( 

764 community_id=w_id, 

765 page_size=3, 

766 ) 

767 ) 

768 assert [d.title for d in res.discussions] == [ 

769 "Discussion title 6", 

770 "Discussion title 5", 

771 "Discussion title 4", 

772 ] 

773 for d in res.discussions: 

774 assert d.thread.thread_id > 0 

775 assert d.thread.num_responses == 0 

776 

777 res = api.ListDiscussions( 

778 communities_pb2.ListDiscussionsReq( 

779 community_id=w_id, 

780 page_token=res.next_page_token, 

781 page_size=2, 

782 ) 

783 ) 

784 assert [d.title for d in res.discussions] == [ 

785 "Discussion title 3", 

786 "Discussion title 2", 

787 ] 

788 for d in res.discussions: 

789 assert d.thread.thread_id > 0 

790 assert d.thread.num_responses == 0 

791 

792 res = api.ListDiscussions( 

793 communities_pb2.ListDiscussionsReq( 

794 community_id=w_id, 

795 page_token=res.next_page_token, 

796 page_size=2, 

797 ) 

798 ) 

799 assert [d.title for d in res.discussions] == [ 

800 "Discussion title 1", 

801 ] 

802 for d in res.discussions: 

803 assert d.thread.thread_id > 0 

804 assert d.thread.num_responses == 0 

805 

806 res = api.ListDiscussions( 

807 communities_pb2.ListDiscussionsReq( 

808 community_id=c1r1c2_id, 

809 ) 

810 ) 

811 assert [d.title for d in res.discussions] == [ 

812 "Discussion title 7", 

813 ] 

814 for d in res.discussions: 

815 assert d.thread.thread_id > 0 

816 assert d.thread.num_responses == 0 

817 

818 @staticmethod 

819 def test_is_user_in_node_geography(testing_communities): 

820 with session_scope() as session: 

821 c1_id = get_community_id(session, "Country 1") 

822 

823 user1_id, _ = get_user_id_and_token(session, "user1") 

824 user2_id, _ = get_user_id_and_token(session, "user2") 

825 user3_id, _ = get_user_id_and_token(session, "user3") 

826 user4_id, _ = get_user_id_and_token(session, "user4") 

827 user5_id, _ = get_user_id_and_token(session, "user5") 

828 

829 # All these users should be in Country 1's geography 

830 assert is_user_in_node_geography(session, user1_id, c1_id) 

831 assert is_user_in_node_geography(session, user2_id, c1_id) 

832 assert is_user_in_node_geography(session, user3_id, c1_id) 

833 assert is_user_in_node_geography(session, user4_id, c1_id) 

834 assert is_user_in_node_geography(session, user5_id, c1_id) 

835 

836 @staticmethod 

837 def test_ListEvents(testing_communities): 

838 with session_scope() as session: 

839 user1_id, token1 = get_user_id_and_token(session, "user1") 

840 c1_id = get_community_id(session, "Country 1") 

841 

842 with communities_session(token1) as api: 

843 res = api.ListEvents( 

844 communities_pb2.ListEventsReq( 

845 community_id=c1_id, 

846 page_size=3, 

847 ) 

848 ) 

849 assert [d.title for d in res.events] == [ 

850 "Event title 1", 

851 "Event title 2", 

852 "Event title 3", 

853 ] 

854 

855 res = api.ListEvents( 

856 communities_pb2.ListEventsReq( 

857 community_id=c1_id, 

858 page_token=res.next_page_token, 

859 page_size=2, 

860 ) 

861 ) 

862 assert [d.title for d in res.events] == [ 

863 "Event title 4", 

864 "Event title 5", 

865 ] 

866 

867 res = api.ListEvents( 

868 communities_pb2.ListEventsReq( 

869 community_id=c1_id, 

870 page_token=res.next_page_token, 

871 page_size=2, 

872 ) 

873 ) 

874 assert [d.title for d in res.events] == [ 

875 "Event title 6", 

876 ] 

877 assert not res.next_page_token 

878 

879 @staticmethod 

880 def test_empty_query_aborts(testing_communities): 

881 with session_scope() as session: 

882 _, token = get_user_id_and_token(session, "user1") 

883 

884 with communities_session(token) as api: 

885 with pytest.raises(grpc.RpcError) as err: 

886 api.SearchCommunities(communities_pb2.SearchCommunitiesReq(query=" ")) 

887 assert err.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

888 assert err.value.details() == "Query must be at least 3 characters long." 

889 

890 @staticmethod 

891 def test_min_length_lt_3_aborts(testing_communities): 

892 """ 

893 len(query) < 3 → return INVALID_ARGUMENT: query_too_short 

894 """ 

895 with session_scope() as session: 

896 _, token = get_user_id_and_token(session, "user1") 

897 

898 with communities_session(token) as api: 

899 with pytest.raises(grpc.RpcError) as err: 

900 api.SearchCommunities(communities_pb2.SearchCommunitiesReq(query="zz", page_size=5)) 

901 assert err.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

902 assert err.value.details() == "Query must be at least 3 characters long." 

903 

904 @staticmethod 

905 def test_typo_matches_existing_name(testing_communities): 

906 """ 

907 Word_similarity should match a simple typo in community name. 

908 """ 

909 with session_scope() as session: 

910 _, token = get_user_id_and_token(session, "user1") 

911 c1_id = get_community_id(session, "Country 1") 

912 

913 with communities_session(token) as api: 

914 res = api.SearchCommunities(communities_pb2.SearchCommunitiesReq(query="Coutri 1", page_size=5)) 

915 ids = [c.community_id for c in res.communities] 

916 assert c1_id in ids 

917 

918 @staticmethod 

919 def test_word_similarity_matches_partial_word(testing_communities): 

920 """ 

921 Query 'city' should match 'Country 1, Region 1, City 1'. 

922 """ 

923 with session_scope() as session: 

924 _, token = get_user_id_and_token(session, "user1") 

925 city1_id = get_community_id(session, "Country 1, Region 1, City 1") # переименовал для ясности 

926 

927 with communities_session(token) as api: 

928 res = api.SearchCommunities(communities_pb2.SearchCommunitiesReq(query="city", page_size=5)) 

929 ids = [c.community_id for c in res.communities] 

930 assert city1_id in ids 

931 

932 @staticmethod 

933 def test_results_sorted_by_similarity(testing_communities): 

934 """ 

935 Results should be ordered by similarity score (best match first). 

936 For query 'Country 1, Region', the full region name should rank higher 

937 than deeper descendants like 'City 1'. 

938 """ 

939 with session_scope() as session: 

940 _, token = get_user_id_and_token(session, "user1") 

941 region_id = get_community_id(session, "Country 1, Region 1") 

942 city_id = get_community_id(session, "Country 1, Region 1, City 1") 

943 

944 with communities_session(token) as api: 

945 res = api.SearchCommunities(communities_pb2.SearchCommunitiesReq(query="Country 1, Region", page_size=5)) 

946 ids = [c.community_id for c in res.communities] 

947 

948 assert region_id in ids 

949 assert city_id in ids 

950 assert ids.index(region_id) < ids.index(city_id) 

951 

952 @staticmethod 

953 def test_no_results_returns_empty(testing_communities): 

954 """ 

955 For a nonsense query that shouldn't meet the similarity threshold, return empty list. 

956 """ 

957 with session_scope() as session: 

958 _, token = get_user_id_and_token(session, "user1") 

959 

960 with communities_session(token) as api: 

961 res = api.SearchCommunities(communities_pb2.SearchCommunitiesReq(query="qwertyuiopasdf", page_size=5)) 

962 assert res.communities == [] 

963 

964 @staticmethod 

965 def test_ListAllCommunities(testing_communities): 

966 """ 

967 Test that ListAllCommunities returns all communities with proper hierarchy information. 

968 """ 

969 with session_scope() as session: 

970 user1_id, token1 = get_user_id_and_token(session, "user1") 

971 user2_id, token2 = get_user_id_and_token(session, "user2") 

972 user6_id, token6 = get_user_id_and_token(session, "user6") 

973 w_id = get_community_id(session, "Global") 

974 c1_id = get_community_id(session, "Country 1") 

975 c1r1_id = get_community_id(session, "Country 1, Region 1") 

976 c1r1c1_id = get_community_id(session, "Country 1, Region 1, City 1") 

977 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

978 c1r2_id = get_community_id(session, "Country 1, Region 2") 

979 c1r2c1_id = get_community_id(session, "Country 1, Region 2, City 1") 

980 c2_id = get_community_id(session, "Country 2") 

981 c2r1_id = get_community_id(session, "Country 2, Region 1") 

982 c2r1c1_id = get_community_id(session, "Country 2, Region 1, City 1") 

983 

984 # Test with user1 who is a member of multiple communities 

985 with communities_session(token1) as api: 

986 res = api.ListAllCommunities(communities_pb2.ListAllCommunitiesReq()) 

987 

988 # Should return all 10 communities 

989 assert len(res.communities) == 10 

990 

991 # Get all community IDs 

992 community_ids = [c.community_id for c in res.communities] 

993 assert set(community_ids) == { 

994 w_id, 

995 c1_id, 

996 c1r1_id, 

997 c1r1c1_id, 

998 c1r1c2_id, 

999 c1r2_id, 

1000 c1r2c1_id, 

1001 c2_id, 

1002 c2r1_id, 

1003 c2r1c1_id, 

1004 } 

1005 

1006 # Check that each community has the required fields 

1007 for community in res.communities: 

1008 assert community.community_id > 0 

1009 assert len(community.name) > 0 

1010 assert len(community.slug) > 0 

1011 assert community.member_count > 0 

1012 # member field should be a boolean 

1013 assert isinstance(community.member, bool) 

1014 # parents should be present for hierarchical ordering 

1015 assert len(community.parents) >= 1 

1016 # created timestamp should be present 

1017 assert community.HasField("created") 

1018 assert community.created.seconds > 0 

1019 

1020 # Find specific communities and verify their data 

1021 global_community = next(c for c in res.communities if c.community_id == w_id) 

1022 assert global_community.name == "Global" 

1023 assert global_community.slug == "global" 

1024 assert global_community.member # user1 is a member 

1025 assert global_community.member_count == 8 

1026 assert len(global_community.parents) == 1 # Only itself 

1027 

1028 c1r1c1_community = next(c for c in res.communities if c.community_id == c1r1c1_id) 

1029 assert c1r1c1_community.name == "Country 1, Region 1, City 1" 

1030 assert c1r1c1_community.slug == "country-1-region-1-city-1" 

1031 assert c1r1c1_community.member # user1 is a member 

1032 assert c1r1c1_community.member_count == 3 

1033 assert len(c1r1c1_community.parents) == 4 # Global, Country 1, Region 1, City 1 

1034 # Verify parent hierarchy 

1035 assert c1r1c1_community.parents[0].community.community_id == w_id 

1036 assert c1r1c1_community.parents[1].community.community_id == c1_id 

1037 assert c1r1c1_community.parents[2].community.community_id == c1r1_id 

1038 assert c1r1c1_community.parents[3].community.community_id == c1r1c1_id 

1039 

1040 # Test with user6 who has different community memberships 

1041 with communities_session(token6) as api: 

1042 res = api.ListAllCommunities(communities_pb2.ListAllCommunitiesReq()) 

1043 

1044 # Should still return all 10 communities 

1045 assert len(res.communities) == 10 

1046 

1047 # Find Country 2 community - user6 should be a member 

1048 c2_community = next(c for c in res.communities if c.community_id == c2_id) 

1049 assert c2_community.member # user6 is a member 

1050 assert c2_community.member_count == 2 

1051 

1052 # Find Country 1 - user6 should NOT be a member 

1053 c1_community = next(c for c in res.communities if c.community_id == c1_id) 

1054 assert not c1_community.member # user6 is not a member 

1055 

1056 # Global - user6 should be a member 

1057 global_community = next(c for c in res.communities if c.community_id == w_id) 

1058 assert global_community.member # user6 is a member 

1059 

1060 

1061def test_JoinCommunity_and_LeaveCommunity(testing_communities): 

1062 # these are separate as they mutate the database 

1063 with session_scope() as session: 

1064 # at x=1, inside c1 (country 1) 

1065 user1_id, token1 = get_user_id_and_token(session, "user1") 

1066 # at x=51, not inside c1 

1067 user8_id, token8 = get_user_id_and_token(session, "user8") 

1068 c1_id = get_community_id(session, "Country 1") 

1069 

1070 with communities_session(token1) as api: 

1071 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1072 

1073 # user1 is already part of c1, cannot join 

1074 with pytest.raises(grpc.RpcError) as e: 

1075 res = api.JoinCommunity( 

1076 communities_pb2.JoinCommunityReq( 

1077 community_id=c1_id, 

1078 ) 

1079 ) 

1080 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1081 assert e.value.details() == "You're already in that community." 

1082 

1083 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1084 

1085 # user1 is inside c1, cannot leave 

1086 with pytest.raises(grpc.RpcError) as e: 

1087 res = api.LeaveCommunity( 

1088 communities_pb2.LeaveCommunityReq( 

1089 community_id=c1_id, 

1090 ) 

1091 ) 

1092 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1093 assert ( 

1094 e.value.details() 

1095 == "Your location on your profile is within this community, so you cannot leave it. However, you can adjust your notifications in your account settings." 

1096 ) 

1097 

1098 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1099 

1100 with communities_session(token8) as api: 

1101 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1102 

1103 # user8 is not in c1 yet, cannot leave 

1104 with pytest.raises(grpc.RpcError) as e: 

1105 res = api.LeaveCommunity( 

1106 communities_pb2.LeaveCommunityReq( 

1107 community_id=c1_id, 

1108 ) 

1109 ) 

1110 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1111 assert e.value.details() == "You're not in that community." 

1112 

1113 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1114 

1115 # user8 is not in c1 and not part, can join 

1116 res = api.JoinCommunity( 

1117 communities_pb2.JoinCommunityReq( 

1118 community_id=c1_id, 

1119 ) 

1120 ) 

1121 

1122 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1123 

1124 # user8 is not in c1 and but now part, can't join again 

1125 with pytest.raises(grpc.RpcError) as e: 

1126 res = api.JoinCommunity( 

1127 communities_pb2.JoinCommunityReq( 

1128 community_id=c1_id, 

1129 ) 

1130 ) 

1131 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1132 assert e.value.details() == "You're already in that community." 

1133 

1134 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1135 

1136 # user8 is not in c1 yet, but part of it, can leave 

1137 res = api.LeaveCommunity( 

1138 communities_pb2.LeaveCommunityReq( 

1139 community_id=c1_id, 

1140 ) 

1141 ) 

1142 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1143 

1144 

1145def test_LeaveCommunity_regression(db): 

1146 # See github issue #1444, repro: 

1147 # 1. Join more than one community 

1148 # 2. Leave one of them 

1149 # 3. You are no longer in any community 

1150 # admin 

1151 user1, token1 = generate_user(username="user1", geom=create_1d_point(200), geom_radius=0.1) 

1152 # joiner/leaver 

1153 user2, token2 = generate_user(username="user2", geom=create_1d_point(201), geom_radius=0.1) 

1154 

1155 with session_scope() as session: 

1156 c0 = create_community(session, 0, 100, "Community 0", [user1], [], None) 

1157 c1 = create_community(session, 0, 50, "Community 1", [user1], [], c0) 

1158 c2 = create_community(session, 0, 10, "Community 2", [user1], [], c0) 

1159 c0_id = c0.id 

1160 c1_id = c1.id 

1161 c2_id = c2.id 

1162 

1163 enforce_community_memberships() 

1164 

1165 with communities_session(token1) as api: 

1166 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c0_id)).member 

1167 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1168 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c2_id)).member 

1169 

1170 with communities_session(token2) as api: 

1171 # first check we're not in any communities 

1172 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c0_id)).member 

1173 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1174 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c2_id)).member 

1175 

1176 # join some communities 

1177 api.JoinCommunity(communities_pb2.JoinCommunityReq(community_id=c1_id)) 

1178 api.JoinCommunity(communities_pb2.JoinCommunityReq(community_id=c2_id)) 

1179 

1180 # check memberships 

1181 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c0_id)).member 

1182 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1183 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c2_id)).member 

1184 

1185 # leave just c2 

1186 api.LeaveCommunity(communities_pb2.LeaveCommunityReq(community_id=c2_id)) 

1187 

1188 # check memberships 

1189 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c0_id)).member 

1190 assert api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c1_id)).member 

1191 assert not api.GetCommunity(communities_pb2.GetCommunityReq(community_id=c2_id)).member 

1192 

1193 

1194def test_enforce_community_memberships_for_user(testing_communities): 

1195 """ 

1196 Make sure the user is added to the right communities on signup 

1197 """ 

1198 with auth_api_session() as (auth_api, metadata_interceptor): 

1199 res = auth_api.SignupFlow( 

1200 auth_pb2.SignupFlowReq( 

1201 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

1202 account=auth_pb2.SignupAccount( 

1203 username="frodo", 

1204 password="a very insecure password", 

1205 birthdate="1970-01-01", 

1206 gender="Bot", 

1207 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1208 city="Country 1, Region 1, City 2", 

1209 # lat=8, lng=1 is equivalent to creating this coordinate with create_coordinate(8) 

1210 lat=8, 

1211 lng=1, 

1212 radius=500, 

1213 accept_tos=True, 

1214 ), 

1215 feedback=auth_pb2.ContributorForm(), 

1216 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1217 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

1218 ) 

1219 ) 

1220 with session_scope() as session: 

1221 email_token = ( 

1222 session.execute(select(SignupFlow).where(SignupFlow.flow_token == res.flow_token)).scalar_one().email_token 

1223 ) 

1224 with auth_api_session() as (auth_api, metadata_interceptor): 

1225 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1226 user_id = res.auth_res.user_id 

1227 

1228 # now check the user is in the right communities 

1229 with session_scope() as session: 

1230 w_id = get_community_id(session, "Global") 

1231 c1_id = get_community_id(session, "Country 1") 

1232 c1r1_id = get_community_id(session, "Country 1, Region 1") 

1233 c1r1c2_id = get_community_id(session, "Country 1, Region 1, City 2") 

1234 

1235 token, _ = get_session_cookie_tokens(metadata_interceptor) 

1236 

1237 with communities_session(token) as api: 

1238 res = api.ListUserCommunities(communities_pb2.ListUserCommunitiesReq()) 

1239 assert [c.community_id for c in res.communities] == [w_id, c1_id, c1r1_id, c1r1c2_id] 

1240 

1241 

1242# TODO: requires transferring of content 

1243 

1244# def test_ListPlaces(db, testing_communities): 

1245# pass 

1246 

1247# def test_ListGuides(db, testing_communities): 

1248# pass 

1249 

1250# def test_ListEvents(db, testing_communities): 

1251# pass