Coverage for app / backend / src / couchers / migrations / env.py: 45%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1import re 

2from logging.config import fileConfig 

3from pathlib import Path 

4from typing import Any 

5 

6from alembic import context 

7from alembic.config import Config 

8from sqlalchemy import engine_from_config, pool 

9from sqlalchemy.dialects import registry 

10from sqlalchemy.schema import MetaData 

11 

12from couchers import models 

13from couchers.config import config as couchers_config 

14 

15# Register psycopg (psycopg3) as the default driver for postgresql:// URLs 

16registry.register("postgresql", "sqlalchemy.dialects.postgresql.psycopg", "PGDialect_psycopg") 

17 

18# this is the Alembic Config object, which provides 

19# access to the values within the .ini file in use. 

20config: Config = context.config 

21 

22config.set_main_option("sqlalchemy.url", couchers_config["DATABASE_CONNECTION_STRING"]) 

23 

24 

25# Interpret the config file for Python logging. 

26# This line sets up loggers basically. 

27if config.get_main_option("dont_mess_up_logging", "dont_care") == "dont_care": 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true

28 if not config.config_file_name: 

29 raise RuntimeError(config.config_file_name) 

30 fileConfig(config.config_file_name) 

31 

32# add your model's MetaData object here 

33# for 'autogenerate' support 

34# from myapp import mymodel 

35# target_metadata = mymodel.Base.metadata 

36target_metadata: MetaData = models.Base.metadata 

37 

38# other values from the config, defined by the needs of env.py, 

39# can be acquired: 

40# my_important_option = config.get_main_option("my_important_option") 

41# ... etc. 

42 

43 

44exclude_tables = config.get_section("alembic:exclude", {}).get("tables", "").split(",") 

45 

46VERSIONS_DIR = Path(__file__).parent / "versions" 

47 

48 

49def _next_ordinal() -> str: 

50 """Compute the next ordinal migration number (e.g. "0134") by scanning existing filenames.""" 

51 max_ordinal = 0 

52 for path in VERSIONS_DIR.glob("*.py"): 

53 match = re.match(r"^(\d+)_", path.name) 

54 if match: 

55 max_ordinal = max(max_ordinal, int(match.group(1))) 

56 return f"{max_ordinal + 1:04d}" 

57 

58 

59def process_revision_directives(context: Any, revision: Any, directives: Any) -> None: 

60 """Set the revision ID to the next ordinal number instead of a random hex string.""" 

61 if directives: 

62 directives[0].rev_id = _next_ordinal() 

63 

64 

65def include_name(name: str | None, type_: str, parent_names: Any) -> bool: 

66 if type_ == "schema": 

67 return name in [None, "logging"] 

68 if type_ == "table": 

69 return name not in exclude_tables 

70 return True 

71 

72 

73def include_object(obj: Any, name: str | None, type_: str, reflected: bool, compare_to: Any) -> bool: 

74 """ 

75 Filter objects during autogenerate comparison. 

76 

77 Unlike include_name (which only filters database reflection), this hook 

78 filters BOTH database objects AND model metadata objects, which is needed 

79 to properly ignore GeoAlchemy2's auto-generated spatial indexes. 

80 """ 

81 # Filter out GeoAlchemy2 auto-generated spatial indexes (from both DB and metadata) 

82 # These indexes are named like: idx_<table>_<column> and use GIST 

83 if type_ == "index" and name is not None and name.startswith("idx_") and name.endswith("_geom"): 

84 return False 

85 return True 

86 

87 

88def run_migrations_offline() -> None: 

89 """Run migrations in 'offline' mode. 

90 

91 This configures the context with just a URL 

92 and not an Engine, though an Engine is acceptable 

93 here as well. By skipping the Engine creation 

94 we don't even need a DBAPI to be available. 

95 

96 Calls to context.execute() here emit the given string to the 

97 script output. 

98 

99 """ 

100 url = config.get_main_option("sqlalchemy.url") 

101 context.configure( 

102 url=url, 

103 target_metadata=target_metadata, 

104 literal_binds=True, 

105 dialect_opts={"paramstyle": "named"}, 

106 include_schemas=True, 

107 include_name=include_name, 

108 include_object=include_object, 

109 compare_type=True, 

110 process_revision_directives=process_revision_directives, 

111 ) 

112 

113 with context.begin_transaction(): 

114 context.run_migrations() 

115 

116 

117def run_migrations_online() -> None: 

118 """Run migrations in 'online' mode. 

119 

120 In this scenario we need to create an Engine 

121 and associate a connection with the context. 

122 

123 """ 

124 connectable = engine_from_config( 

125 config.get_section(config.config_ini_section, {}), 

126 prefix="sqlalchemy.", 

127 poolclass=pool.NullPool, 

128 ) 

129 

130 with connectable.connect() as connection: 

131 context.configure( 

132 connection=connection, 

133 target_metadata=target_metadata, 

134 include_schemas=True, 

135 include_name=include_name, 

136 include_object=include_object, 

137 process_revision_directives=process_revision_directives, 

138 ) 

139 

140 with context.begin_transaction(): 

141 context.run_migrations() 

142 

143 

144if context.is_offline_mode(): 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true

145 run_migrations_offline() 

146else: 

147 run_migrations_online()