Coverage for app / backend / src / couchers / migrations / env.py: 50%

40 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1from logging.config import fileConfig 

2from typing import Any 

3 

4from alembic import context 

5from alembic.config import Config 

6from sqlalchemy import engine_from_config, pool 

7from sqlalchemy.schema import MetaData 

8 

9from couchers import models 

10from couchers.config import config as couchers_config 

11 

12# this is the Alembic Config object, which provides 

13# access to the values within the .ini file in use. 

14config: Config = context.config 

15 

16config.set_main_option("sqlalchemy.url", couchers_config["DATABASE_CONNECTION_STRING"]) 

17 

18 

19# Interpret the config file for Python logging. 

20# This line sets up loggers basically. 

21if config.get_main_option("dont_mess_up_logging", "dont_care") == "dont_care": 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true

22 if not config.config_file_name: 

23 raise RuntimeError(config.config_file_name) 

24 fileConfig(config.config_file_name) 

25 

26# add your model's MetaData object here 

27# for 'autogenerate' support 

28# from myapp import mymodel 

29# target_metadata = mymodel.Base.metadata 

30target_metadata: MetaData = models.Base.metadata 

31 

32# other values from the config, defined by the needs of env.py, 

33# can be acquired: 

34# my_important_option = config.get_main_option("my_important_option") 

35# ... etc. 

36 

37 

38exclude_tables = config.get_section("alembic:exclude", {}).get("tables", "").split(",") 

39 

40 

41def include_name(name: str | None, type_: str, parent_names: Any) -> bool: 

42 if type_ == "schema": 

43 return name in [None, "logging"] 

44 if type_ == "table": 

45 return name not in exclude_tables 

46 return True 

47 

48 

49def include_object(obj: Any, name: str | None, type_: str, reflected: bool, compare_to: Any) -> bool: 

50 """ 

51 Filter objects during autogenerate comparison. 

52 

53 Unlike include_name (which only filters database reflection), this hook 

54 filters BOTH database objects AND model metadata objects, which is needed 

55 to properly ignore GeoAlchemy2's auto-generated spatial indexes. 

56 """ 

57 # Filter out GeoAlchemy2 auto-generated spatial indexes (from both DB and metadata) 

58 # These indexes are named like: idx_<table>_<column> and use GIST 

59 if type_ == "index" and name is not None and name.startswith("idx_") and name.endswith("_geom"): 

60 return False 

61 return True 

62 

63 

64def run_migrations_offline() -> None: 

65 """Run migrations in 'offline' mode. 

66 

67 This configures the context with just a URL 

68 and not an Engine, though an Engine is acceptable 

69 here as well. By skipping the Engine creation 

70 we don't even need a DBAPI to be available. 

71 

72 Calls to context.execute() here emit the given string to the 

73 script output. 

74 

75 """ 

76 url = config.get_main_option("sqlalchemy.url") 

77 context.configure( 

78 url=url, 

79 target_metadata=target_metadata, 

80 literal_binds=True, 

81 dialect_opts={"paramstyle": "named"}, 

82 include_schemas=True, 

83 include_name=include_name, 

84 include_object=include_object, 

85 compare_type=True, 

86 ) 

87 

88 with context.begin_transaction(): 

89 context.run_migrations() 

90 

91 

92def run_migrations_online() -> None: 

93 """Run migrations in 'online' mode. 

94 

95 In this scenario we need to create an Engine 

96 and associate a connection with the context. 

97 

98 """ 

99 connectable = engine_from_config( 

100 config.get_section(config.config_ini_section, {}), 

101 prefix="sqlalchemy.", 

102 poolclass=pool.NullPool, 

103 ) 

104 

105 with connectable.connect() as connection: 

106 context.configure( 

107 connection=connection, 

108 target_metadata=target_metadata, 

109 include_schemas=True, 

110 include_name=include_name, 

111 include_object=include_object, 

112 ) 

113 

114 with context.begin_transaction(): 

115 context.run_migrations() 

116 

117 

118if context.is_offline_mode(): 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 run_migrations_offline() 

120else: 

121 run_migrations_online()