Coverage for src/couchers/experimentation.py: 31%
65 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
1"""
2Experimentation framework for feature flags and experiments.
4Uses Statsig under the hood, but abstracts the implementation details.
6IMPORTANT - FORKING SAFETY:
7The underlying SDK uses internal threading and async runtime components that
8do NOT work correctly when copied across process boundaries during fork().
9Initializing before forking will cause deadlocks and unpredictable behavior.
11This module provides fork-safe initialization:
12- Call `setup_experimentation()` ONLY in child processes AFTER forking
13- Call `setup_experimentation()` in the main process ONLY AFTER all child processes have been spawned
14- NEVER call setup_experimentation() at module load time
16The Couchers backend uses multiprocessing.Process for background workers, which forks.
17The initialization flow is:
181. app.py starts, spawns worker processes via multiprocessing.Process (fork happens)
192. Each worker process calls `_run_forever()` which calls `setup_experimentation()` post-fork
203. The main process (API server) calls `setup_experimentation()` after spawning workers
21"""
23import atexit
24import logging
25from typing import TYPE_CHECKING
27from statsig_python_core import Statsig, StatsigOptions, StatsigUser
29from couchers.config import config
31if TYPE_CHECKING:
32 from couchers.context import CouchersContext
34logger = logging.getLogger(__name__)
36# Track whether we've initialized in this process to prevent double-initialization
37_initialized = False
40class ExperimentationNotInitializedError(Exception):
41 """Raised when experimentation functions are called before initialization."""
44def setup_experimentation() -> None:
45 """
46 Initialize the experimentation framework. Must be called AFTER process forking.
48 This function is safe to call multiple times - subsequent calls will be no-ops.
50 Call this:
51 - In worker processes: inside _run_forever() after db_post_fork()
52 - In main process: after spawning all worker processes
54 IMPORTANT: Importing this module is safe before forking. Only calling this
55 function (which initializes internal threads) must happen after forking.
56 """
57 global _initialized
59 if _initialized:
60 logger.debug("Experimentation already initialized in this process, skipping")
61 return
63 if not config["EXPERIMENTATION_ENABLED"]:
64 logger.info("Experimentation is disabled, skipping initialization")
65 _initialized = True
66 return
68 logger.info("Initializing experimentation framework")
70 options = StatsigOptions()
71 options.environment = config["STATSIG_ENVIRONMENT"]
73 # Create the shared instance for global access
74 statsig = Statsig.new_shared(config["STATSIG_SERVER_SECRET_KEY"], options)
76 # initialize() starts internal threads - this MUST happen after forking
77 statsig.initialize().wait()
79 _initialized = True
80 logger.info("Experimentation framework initialized successfully")
82 # Verify the integration works by checking a test gate
83 test_user = StatsigUser(user_id="integration_test")
84 test_gate_result = statsig.check_gate(test_user, "test_statsig_integration")
85 logger.info(f"Experimentation integration test: gate 'test_statsig_integration' = {test_gate_result}")
87 atexit.register(_shutdown_experimentation)
90def _shutdown_experimentation() -> None:
91 """
92 Shutdown the experimentation framework, flushing any pending events.
93 Called automatically via atexit when the process exits.
94 """
95 if not config["EXPERIMENTATION_ENABLED"]:
96 return
98 if Statsig.has_shared_instance():
99 logger.info("Shutting down experimentation framework")
100 Statsig.shared().shutdown().wait()
101 Statsig.remove_shared()
104def _check_initialized() -> None:
105 """
106 Check that experimentation is initialized if enabled.
108 Raises:
109 ExperimentationNotInitializedError: If experimentation is enabled but not initialized.
110 """
111 if config["EXPERIMENTATION_ENABLED"] and not _initialized:
112 raise ExperimentationNotInitializedError(
113 "Experimentation is not initialized - call setup_experimentation() first"
114 )
117def _get_statsig_user(context: "CouchersContext") -> StatsigUser:
118 """
119 Get or create a cached StatsigUser for the given context.
121 The StatsigUser is cached on the context to avoid recreating it for each call.
122 """
123 if not hasattr(context, "_statsig_user"):
124 context._statsig_user = StatsigUser(user_id=str(context.user_id)) # type: ignore[attr-defined]
125 return context._statsig_user # type: ignore[attr-defined, no-any-return]
128def check_gate(context: "CouchersContext", gate_name: str) -> bool:
129 """
130 Check if a feature gate is enabled for the user in this context.
132 Args:
133 context: The CouchersContext for the current request
134 gate_name: The name of the feature gate
136 Returns:
137 True if the gate is enabled for this user, False otherwise.
138 Returns False if experimentation is disabled.
139 Returns True if EXPERIMENTATION_PASS_ALL_GATES is enabled.
141 Raises:
142 ExperimentationNotInitializedError: If experimentation is enabled but not initialized.
143 """
144 _check_initialized()
145 if config["EXPERIMENTATION_PASS_ALL_GATES"]:
146 return True
147 if not config["EXPERIMENTATION_ENABLED"]:
148 return False
149 return Statsig.shared().check_gate(_get_statsig_user(context), gate_name)
152def get_experiment(context: "CouchersContext", experiment_name: str) -> dict[str, object]:
153 """
154 Get experiment configuration for the user in this context.
156 Args:
157 context: The CouchersContext for the current request
158 experiment_name: The name of the experiment
160 Returns:
161 A dictionary with experiment values.
162 Returns empty dict if experimentation is disabled.
164 Raises:
165 ExperimentationNotInitializedError: If experimentation is enabled but not initialized.
166 """
167 _check_initialized()
168 if not config["EXPERIMENTATION_ENABLED"]:
169 return {}
170 experiment = Statsig.shared().get_experiment(_get_statsig_user(context), experiment_name)
171 return experiment.value if experiment else {}
174def get_dynamic_config(context: "CouchersContext", config_name: str) -> dict[str, object]:
175 """
176 Get dynamic config for the user in this context.
178 Args:
179 context: The CouchersContext for the current request
180 config_name: The name of the dynamic config
182 Returns:
183 A dictionary with config values.
184 Returns empty dict if experimentation is disabled.
186 Raises:
187 ExperimentationNotInitializedError: If experimentation is enabled but not initialized.
188 """
189 _check_initialized()
190 if not config["EXPERIMENTATION_ENABLED"]:
191 return {}
192 dynamic_config = Statsig.shared().get_dynamic_config(_get_statsig_user(context), config_name)
193 return dynamic_config.value if dynamic_config else {}
196def log_event(
197 context: "CouchersContext",
198 event_name: str,
199 value: str | float | None = None,
200 metadata: dict[str, str] | None = None,
201) -> None:
202 """
203 Log a custom event for analytics.
205 Args:
206 context: The CouchersContext for the current request
207 event_name: Name of the event
208 value: Optional value associated with the event
209 metadata: Optional metadata dictionary
211 Raises:
212 ExperimentationNotInitializedError: If experimentation is enabled but not initialized.
213 """
214 _check_initialized()
215 if not config["EXPERIMENTATION_ENABLED"]:
216 return
217 Statsig.shared().log_event(
218 user=_get_statsig_user(context),
219 event_name=event_name,
220 value=value,
221 metadata=metadata,
222 )