Coverage for app / backend / src / couchers / helpers / clusters.py: 93%
28 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1from collections.abc import Sequence
3from geoalchemy2.shape import from_shape
4from shapely.geometry.base import BaseGeometry
5from sqlalchemy.orm import Session
7from couchers.models import Cluster, ClusterRole, ClusterSubscription, Node, Page, PageType, PageVersion, Thread
9DEFAULT_PAGE_CONTENT = "There is nothing here yet..."
10DEFAULT_PAGE_TITLE_TEMPLATE = "Main page for the {name} {type}"
13def create_node(session: Session, geom: BaseGeometry, parent_node_id: int | None) -> Node:
14 node = Node(geom=from_shape(geom), parent_node_id=parent_node_id)
15 session.add(node)
16 session.flush()
17 return node
20def create_cluster(
21 session: Session,
22 parent_node_id: int,
23 name: str,
24 description: str,
25 creator_user_id: int,
26 admin_ids: Sequence[int],
27 is_community: bool,
28) -> Cluster:
29 cluster_type = "community" if is_community else "group"
30 cluster = Cluster(
31 name=name,
32 description=description,
33 parent_node_id=parent_node_id,
34 is_official_cluster=is_community,
35 )
36 session.add(cluster)
37 session.flush()
38 thread = Thread()
39 session.add(thread)
40 session.flush()
41 main_page = Page(
42 parent_node_id=cluster.parent_node_id,
43 creator_user_id=creator_user_id,
44 owner_cluster_id=cluster.id,
45 type=PageType.main_page,
46 thread_id=thread.id,
47 )
48 session.add(main_page)
49 session.flush()
50 page_version = PageVersion(
51 page_id=main_page.id,
52 editor_user_id=creator_user_id,
53 title=DEFAULT_PAGE_TITLE_TEMPLATE.format(name=name, type=cluster_type),
54 content=DEFAULT_PAGE_CONTENT,
55 )
56 session.add(page_version)
57 for admin_id in admin_ids: 57 ↛ 58line 57 didn't jump to line 58 because the loop on line 57 never started
58 cluster.cluster_subscriptions.append(
59 ClusterSubscription(
60 user_id=admin_id,
61 cluster_id=cluster.id,
62 role=ClusterRole.admin,
63 )
64 )
65 return cluster