Coverage for app / backend / src / couchers / experimentation.py: 25%
65 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1"""
2Experimentation framework for feature flags and experiments.
4Uses Statsig under the hood, but abstracts the implementation details.
6IMPORTANT - FORKING SAFETY:
7The underlying SDK uses internal threading and async runtime components that
8do NOT work correctly when copied across process boundaries during fork().
9Initializing before forking will cause deadlocks and unpredictable behavior.
11This module provides fork-safe initialization:
12- Call `setup_experimentation()` ONLY in child processes AFTER forking
13- Call `setup_experimentation()` in the main process ONLY AFTER all child processes have been spawned
14- NEVER call setup_experimentation() at module load time
16The Couchers backend uses multiprocessing.Process for background workers, which forks.
17The initialization flow is:
181. app.py starts, spawns worker processes via multiprocessing.Process (fork happens)
192. Each worker process calls `_run_forever()` which calls `setup_experimentation()` post-fork
203. The main process (API server) calls `setup_experimentation()` after spawning workers
21"""
23import atexit
24import logging
25from typing import TYPE_CHECKING
27from statsig_python_core import Statsig, StatsigOptions, StatsigUser
29from couchers.config import config
31if TYPE_CHECKING:
32 from couchers.context import CouchersContext
34logger = logging.getLogger(__name__)
36# Track whether we've initialized in this process to prevent double-initialization
37_initialized = False
40class ExperimentationNotInitializedError(Exception):
41 """Raised when experimentation functions are called before initialization."""
44def setup_experimentation() -> None:
45 """
46 Initialize the experimentation framework. Must be called AFTER process forking.
48 This function is safe to call multiple times - subsequent calls will be no-ops.
50 Call this:
51 - In worker processes: inside _run_forever() after db_post_fork()
52 - In main process: after spawning all worker processes
54 IMPORTANT: Importing this module is safe before forking. Only calling this
55 function (which initializes internal threads) must happen after forking.
56 """
57 global _initialized
59 if _initialized:
60 logger.debug("Experimentation already initialized in this process, skipping")
61 return
63 if not config["EXPERIMENTATION_ENABLED"]:
64 logger.info("Experimentation is disabled, skipping initialization")
65 _initialized = True
66 return
68 logger.info("Initializing experimentation framework")
70 options = StatsigOptions()
71 options.environment = config["STATSIG_ENVIRONMENT"]
73 # Create the shared instance for global access
74 statsig = Statsig.new_shared(config["STATSIG_SERVER_SECRET_KEY"], options)
76 # initialize() starts internal threads - this MUST happen after forking
77 statsig.initialize().wait()
79 _initialized = True
80 logger.info("Experimentation framework initialized successfully")
82 # Verify the integration works by checking a test gate
83 test_user = StatsigUser(user_id="integration_test")
84 test_gate_result = statsig.check_gate(test_user, "test_statsig_integration")
85 logger.info(f"Experimentation integration test: gate 'test_statsig_integration' = {test_gate_result}")
87 atexit.register(_shutdown_experimentation)
90def _shutdown_experimentation() -> None:
91 """
92 Shutdown the experimentation framework, flushing any pending events.
93 Called automatically via atexit when the process exits.
94 """
95 if not config["EXPERIMENTATION_ENABLED"]:
96 return
98 if Statsig.has_shared_instance():
99 logger.info("Shutting down experimentation framework")
100 Statsig.shared().shutdown().wait()
101 Statsig.remove_shared()
104def _check_initialized() -> None:
105 """
106 Check that experimentation is initialized if enabled.
108 Raises:
109 ExperimentationNotInitializedError: If experimentation is enabled but not initialized.
110 """
111 if config["EXPERIMENTATION_ENABLED"] and not _initialized: 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true
112 raise ExperimentationNotInitializedError(
113 "Experimentation is not initialized - call setup_experimentation() first"
114 )
117def _get_statsig_user(context: CouchersContext) -> StatsigUser:
118 """
119 Get or create a cached StatsigUser for the given context.
121 The StatsigUser is cached on the context to avoid recreating it for each call.
122 """
123 if not hasattr(context, "_statsig_user"):
124 context._statsig_user = StatsigUser(user_id=str(context.user_id)) # type: ignore[attr-defined]
125 return context._statsig_user # type: ignore[attr-defined, no-any-return]
128def check_gate(context: CouchersContext, gate_name: str) -> bool:
129 """
130 Check if a feature gate is enabled for the user in this context.
132 Args:
133 context: The CouchersContext for the current request
134 gate_name: The name of the feature gate
136 Returns:
137 True if the gate is enabled for this user, False otherwise.
138 Returns False if experimentation is disabled.
139 Returns True if EXPERIMENTATION_PASS_ALL_GATES is enabled.
141 Raises:
142 ExperimentationNotInitializedError: If experimentation is enabled but not initialized.
143 """
144 _check_initialized()
145 if config["EXPERIMENTATION_PASS_ALL_GATES"]: 145 ↛ 147line 145 didn't jump to line 147 because the condition on line 145 was always true
146 return True
147 if not config["EXPERIMENTATION_ENABLED"]:
148 return False
149 return Statsig.shared().check_gate(_get_statsig_user(context), gate_name)
152def get_experiment(context: CouchersContext, experiment_name: str) -> dict[str, object]:
153 """
154 Get experiment configuration for the user in this context.
156 Args:
157 context: The CouchersContext for the current request
158 experiment_name: The name of the experiment
160 Returns:
161 A dictionary with experiment values.
162 Returns empty dict if experimentation is disabled.
164 Raises:
165 ExperimentationNotInitializedError: If experimentation is enabled but not initialized.
166 """
167 _check_initialized()
168 if not config["EXPERIMENTATION_ENABLED"]:
169 return {}
170 # TODO: remove type: ignore when upstream fixes types, see https://github.com/statsig-io/statsig-server-core/issues/36
171 experiment = Statsig.shared().get_experiment(_get_statsig_user(context), experiment_name) # type: ignore[attr-defined]
172 return experiment.value if experiment else {}
175def get_dynamic_config(context: CouchersContext, config_name: str) -> dict[str, object]:
176 """
177 Get dynamic config for the user in this context.
179 Args:
180 context: The CouchersContext for the current request
181 config_name: The name of the dynamic config
183 Returns:
184 A dictionary with config values.
185 Returns empty dict if experimentation is disabled.
187 Raises:
188 ExperimentationNotInitializedError: If experimentation is enabled but not initialized.
189 """
190 _check_initialized()
191 if not config["EXPERIMENTATION_ENABLED"]:
192 return {}
193 # TODO: remove type: ignore when upstream fixes types, see https://github.com/statsig-io/statsig-server-core/issues/36
194 dynamic_config = Statsig.shared().get_dynamic_config(_get_statsig_user(context), config_name) # type: ignore[attr-defined]
195 return dynamic_config.value if dynamic_config else {}
198def log_event(
199 context: CouchersContext,
200 event_name: str,
201 value: str | float | None = None,
202 metadata: dict[str, str] | None = None,
203) -> None:
204 """
205 Log a custom event for analytics.
207 Args:
208 context: The CouchersContext for the current request
209 event_name: Name of the event
210 value: Optional value associated with the event
211 metadata: Optional metadata dictionary
213 Raises:
214 ExperimentationNotInitializedError: If experimentation is enabled but not initialized.
215 """
216 _check_initialized()
217 if not config["EXPERIMENTATION_ENABLED"]:
218 return
219 Statsig.shared().log_event(
220 user=_get_statsig_user(context),
221 event_name=event_name,
222 value=value,
223 metadata=metadata,
224 )