Coverage for app / backend / src / couchers / helpers / clusters.py: 93%

28 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1from collections.abc import Sequence 

2 

3from geoalchemy2.shape import from_shape 

4from shapely.geometry.base import BaseGeometry 

5from sqlalchemy.orm import Session 

6 

7from couchers.models import Cluster, ClusterRole, ClusterSubscription, Node, Page, PageType, PageVersion, Thread 

8 

9DEFAULT_PAGE_CONTENT = "There is nothing here yet..." 

10DEFAULT_PAGE_TITLE_TEMPLATE = "Main page for the {name} {type}" 

11 

12 

13def create_node(session: Session, geom: BaseGeometry, parent_node_id: int | None) -> Node: 

14 node = Node(geom=from_shape(geom), parent_node_id=parent_node_id) 

15 session.add(node) 

16 session.flush() 

17 return node 

18 

19 

20def create_cluster( 

21 session: Session, 

22 parent_node_id: int, 

23 name: str, 

24 description: str, 

25 creator_user_id: int, 

26 admin_ids: Sequence[int], 

27 is_community: bool, 

28) -> Cluster: 

29 cluster_type = "community" if is_community else "group" 

30 cluster = Cluster( 

31 name=name, 

32 description=description, 

33 parent_node_id=parent_node_id, 

34 is_official_cluster=is_community, 

35 ) 

36 session.add(cluster) 

37 session.flush() 

38 thread = Thread() 

39 session.add(thread) 

40 session.flush() 

41 main_page = Page( 

42 parent_node_id=cluster.parent_node_id, 

43 creator_user_id=creator_user_id, 

44 owner_cluster_id=cluster.id, 

45 type=PageType.main_page, 

46 thread_id=thread.id, 

47 ) 

48 session.add(main_page) 

49 session.flush() 

50 page_version = PageVersion( 

51 page_id=main_page.id, 

52 editor_user_id=creator_user_id, 

53 title=DEFAULT_PAGE_TITLE_TEMPLATE.format(name=name, type=cluster_type), 

54 content=DEFAULT_PAGE_CONTENT, 

55 ) 

56 session.add(page_version) 

57 for admin_id in admin_ids: 57 ↛ 58line 57 didn't jump to line 58 because the loop on line 57 never started

58 cluster.cluster_subscriptions.append( 

59 ClusterSubscription( 

60 user_id=admin_id, 

61 cluster_id=cluster.id, 

62 role=ClusterRole.admin, 

63 ) 

64 ) 

65 return cluster