Coverage for app / backend / src / couchers / migrations / env.py: 45%
55 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
1import re
2from logging.config import fileConfig
3from pathlib import Path
4from typing import Any
6from alembic import context
7from alembic.config import Config
8from sqlalchemy import engine_from_config, pool
9from sqlalchemy.dialects import registry
10from sqlalchemy.schema import MetaData
12from couchers import models
13from couchers.config import config as couchers_config
15# Register psycopg (psycopg3) as the default driver for postgresql:// URLs
16registry.register("postgresql", "sqlalchemy.dialects.postgresql.psycopg", "PGDialect_psycopg")
18# this is the Alembic Config object, which provides
19# access to the values within the .ini file in use.
20config: Config = context.config
22config.set_main_option("sqlalchemy.url", couchers_config["DATABASE_CONNECTION_STRING"])
25# Interpret the config file for Python logging.
26# This line sets up loggers basically.
27if config.get_main_option("dont_mess_up_logging", "dont_care") == "dont_care": 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true
28 if not config.config_file_name:
29 raise RuntimeError(config.config_file_name)
30 fileConfig(config.config_file_name)
32# add your model's MetaData object here
33# for 'autogenerate' support
34# from myapp import mymodel
35# target_metadata = mymodel.Base.metadata
36target_metadata: MetaData = models.Base.metadata
38# other values from the config, defined by the needs of env.py,
39# can be acquired:
40# my_important_option = config.get_main_option("my_important_option")
41# ... etc.
44exclude_tables = config.get_section("alembic:exclude", {}).get("tables", "").split(",")
46VERSIONS_DIR = Path(__file__).parent / "versions"
49def _next_ordinal() -> str:
50 """Compute the next ordinal migration number (e.g. "0134") by scanning existing filenames."""
51 max_ordinal = 0
52 for path in VERSIONS_DIR.glob("*.py"):
53 match = re.match(r"^(\d+)_", path.name)
54 if match:
55 max_ordinal = max(max_ordinal, int(match.group(1)))
56 return f"{max_ordinal + 1:04d}"
59def process_revision_directives(context: Any, revision: Any, directives: Any) -> None:
60 """Set the revision ID to the next ordinal number instead of a random hex string."""
61 if directives:
62 directives[0].rev_id = _next_ordinal()
65def include_name(name: str | None, type_: str, parent_names: Any) -> bool:
66 if type_ == "schema":
67 return name in [None, "logging"]
68 if type_ == "table":
69 return name not in exclude_tables
70 return True
73def include_object(obj: Any, name: str | None, type_: str, reflected: bool, compare_to: Any) -> bool:
74 """
75 Filter objects during autogenerate comparison.
77 Unlike include_name (which only filters database reflection), this hook
78 filters BOTH database objects AND model metadata objects, which is needed
79 to properly ignore GeoAlchemy2's auto-generated spatial indexes.
80 """
81 # Filter out GeoAlchemy2 auto-generated spatial indexes (from both DB and metadata)
82 # These indexes are named like: idx_<table>_<column> and use GIST
83 if type_ == "index" and name is not None and name.startswith("idx_") and name.endswith("_geom"):
84 return False
85 return True
88def run_migrations_offline() -> None:
89 """Run migrations in 'offline' mode.
91 This configures the context with just a URL
92 and not an Engine, though an Engine is acceptable
93 here as well. By skipping the Engine creation
94 we don't even need a DBAPI to be available.
96 Calls to context.execute() here emit the given string to the
97 script output.
99 """
100 url = config.get_main_option("sqlalchemy.url")
101 context.configure(
102 url=url,
103 target_metadata=target_metadata,
104 literal_binds=True,
105 dialect_opts={"paramstyle": "named"},
106 include_schemas=True,
107 include_name=include_name,
108 include_object=include_object,
109 compare_type=True,
110 process_revision_directives=process_revision_directives,
111 )
113 with context.begin_transaction():
114 context.run_migrations()
117def run_migrations_online() -> None:
118 """Run migrations in 'online' mode.
120 In this scenario we need to create an Engine
121 and associate a connection with the context.
123 """
124 connectable = engine_from_config(
125 config.get_section(config.config_ini_section, {}),
126 prefix="sqlalchemy.",
127 poolclass=pool.NullPool,
128 )
130 with connectable.connect() as connection:
131 context.configure(
132 connection=connection,
133 target_metadata=target_metadata,
134 include_schemas=True,
135 include_name=include_name,
136 include_object=include_object,
137 process_revision_directives=process_revision_directives,
138 )
140 with context.begin_transaction():
141 context.run_migrations()
144if context.is_offline_mode(): 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true
145 run_migrations_offline()
146else:
147 run_migrations_online()