Coverage for app/backend/src/couchers/experimentation.py: 83%
152 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1"""
2Feature flag and experimentation framework.
4FEATURE_FLAGS_FILE_OVERRIDE_PATH (dev only) reads flags from a JSON file instead of GrowthBook;
5unknown flags fall through to their in-code default either way.
7Two ways to evaluate a flag:
8 - Per-user/request: use the CouchersContext methods (context.get_boolean_value, get_string_value,
9 etc.), which evaluate for the context's user and own the per-request evaluator cache.
10 - Global (no user/request): use the module-level get_global_boolean_value / get_global_string_value
11 / ... below. Use these ONLY when there is genuinely no user to evaluate for and no way to thread
12 one through - per-user evaluation is impossible here, not merely that you don't expect the value
13 to vary per user. Whenever a user is (or could reasonably be) available, use the context: only the
14 per-user path can do percentage rollouts, experiments, and feature-usage tracking.
16setup_experimentation() is called once at process startup.
17"""
19import json
20import logging
21import threading
22import time
23from collections.abc import Callable
24from functools import cache
25from pathlib import Path
26from tempfile import NamedTemporaryFile
27from typing import Any
29import urllib3
30from growthbook import GrowthBook
31from growthbook.common_types import Experiment, FeatureResult, Result
32from sqlalchemy.dialects.postgresql import insert
34from couchers import metrics
35from couchers.config import config
36from couchers.db import session_scope
37from couchers.models.logging import ExperimentExposure, ExposureSource, FeatureUsage
39logger = logging.getLogger(__name__)
41_REFRESH_INTERVAL_SECONDS = 60
42_HTTP_CONNECT_TIMEOUT_SECONDS = 1
43_HTTP_READ_TIMEOUT_SECONDS = 2
45_initialized = False
46_state: dict[str, Any] = {"features": {}, "savedGroups": {}}
47_state_lock = threading.Lock()
48_refresh_stop = threading.Event()
49_refresh_thread: threading.Thread | None = None
50# Unix time of the last successful pull from GrowthBook (None until the first success). Set when we
51# load from the API or seed from the disk cache; drives the staleness metric.
52_last_fetch_time: float | None = None
55class ExperimentationNotInitializedError(Exception):
56 """Raised when experimentation functions are called before initialization."""
59class GrowthBookUnavailableError(Exception):
60 """Raised at startup when features can't be fetched and there's no usable disk cache to fall back on."""
63def _fetch_features() -> dict[str, Any] | None:
64 """Fetch the GrowthBook feature payload over HTTP. Returns None on failure."""
65 api_host = config.GROWTHBOOK_API_HOST.rstrip("/")
66 client_key = config.GROWTHBOOK_CLIENT_KEY
67 url = f"{api_host}/api/features/{client_key}"
68 try:
69 http = urllib3.PoolManager(
70 timeout=urllib3.Timeout(connect=_HTTP_CONNECT_TIMEOUT_SECONDS, read=_HTTP_READ_TIMEOUT_SECONDS)
71 )
72 r = http.request("GET", url, headers={"Accept-Encoding": "gzip, deflate"})
73 if r.status >= 400:
74 logger.warning("GrowthBook fetch returned status %d", r.status)
75 return None
76 return json.loads(r.data.decode("utf-8")) # type: ignore[no-any-return]
77 except Exception:
78 logger.exception("GrowthBook fetch failed")
79 return None
82def _apply_response(response: dict[str, Any]) -> None:
83 """Atomically replace the current snapshot with a freshly fetched response."""
84 with _state_lock:
85 _state["features"] = response.get("features", {})
86 _state["savedGroups"] = response.get("savedGroups", {})
89def _set_last_fetch_time(when: float) -> None:
90 global _last_fetch_time
91 _last_fetch_time = when
94def seconds_since_last_fetch() -> float | None:
95 """Seconds since the last successful pull, or None if never pulled. Drives the staleness metric."""
96 when = _last_fetch_time
97 if when is None:
98 return None
99 return max(0.0, time.time() - when)
102def _write_cache(response: dict[str, Any]) -> None:
103 path = Path(config.GROWTHBOOK_CACHE_PATH)
104 data = json.dumps({"fetched_at": time.time(), "response": response})
105 # Temp file alongside the target then rename: rename is atomic within a filesystem, so a reader
106 # never sees a half-written cache.
107 with NamedTemporaryFile("w", dir=path.parent, prefix=".growthbook-cache-", suffix=".tmp", delete=False) as f:
108 f.write(data)
109 tmp = Path(f.name)
110 tmp.replace(path)
113def _read_cache() -> tuple[dict[str, Any], float] | None:
114 """(response, fetched_at), or None if no cache file exists yet. A corrupt file raises."""
115 path = Path(config.GROWTHBOOK_CACHE_PATH)
116 if not path.exists():
117 return None
118 payload = json.loads(path.read_text())
119 return payload["response"], payload["fetched_at"]
122def _refresh_loop() -> None:
123 while not _refresh_stop.wait(_REFRESH_INTERVAL_SECONDS): 123 ↛ 124line 123 didn't jump to line 124 because the condition on line 123 was never true
124 response = _fetch_features()
125 if response is not None:
126 _apply_response(response)
127 _write_cache(response)
128 _set_last_fetch_time(time.time())
129 logger.debug("GrowthBook features refreshed")
130 # On a failed fetch, keep last-known-good state and retry next tick; the staleness metric climbs.
133@cache
134def _load_local_flags(path_str: str) -> dict[str, Any]:
135 """Read and validate the dev-only override file; cached per path."""
136 # resolve relative to the backend root, independent of cwd (absolute paths are left untouched)
137 path = Path(__file__).parent / ".." / ".." / path_str
138 loaded = json.loads(path.read_text())
139 if not isinstance(loaded, dict):
140 raise ValueError(f"Feature flag override file {path} must contain a JSON object")
141 logger.info("Loaded %d feature flag override(s) from %s", len(loaded), path)
142 return loaded
145def setup_experimentation() -> None:
146 """
147 Initialize the feature flag framework.
149 Safe to call multiple times - subsequent calls are no-ops. In GrowthBook mode this fetches the
150 feature payload once synchronously, then starts a background thread that refreshes every minute;
151 request threads only ever read the in-memory snapshot. In local-file mode (dev only) it loads the
152 JSON file once and never contacts GrowthBook.
153 """
154 global _initialized, _refresh_thread
156 if _initialized: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true
157 return
159 if config.FEATURE_FLAGS_FILE_OVERRIDE_PATH:
160 _load_local_flags(config.FEATURE_FLAGS_FILE_OVERRIDE_PATH)
161 _initialized = True
162 return
164 logger.info("Initializing experimentation framework")
166 response = _fetch_features()
167 if response is not None:
168 _apply_response(response)
169 _write_cache(response)
170 _set_last_fetch_time(time.time())
171 logger.info("GrowthBook features loaded from API")
172 else:
173 # Unreachable at startup: fall back to the disk cache rather than booting on in-code defaults.
174 cached = _read_cache()
175 if cached is None:
176 raise GrowthBookUnavailableError(
177 "Could not fetch features from GrowthBook and no disk cache is available - refusing to "
178 "start on in-code feature-flag defaults"
179 )
180 cached_response, fetched_at = cached
181 _apply_response(cached_response)
182 _set_last_fetch_time(fetched_at)
183 logger.warning(
184 "GrowthBook unavailable at startup; loaded features from disk cache (%.0fs old)",
185 max(0.0, time.time() - fetched_at),
186 )
188 with _state_lock:
189 smoke_gb = GrowthBook(features=_state["features"], savedGroups=_state["savedGroups"])
190 test_gate_result = smoke_gb.is_on("test_growthbook_integration")
192 _refresh_stop.clear()
193 _refresh_thread = threading.Thread(target=_refresh_loop, name="growthbook-refresh", daemon=True)
194 _refresh_thread.start()
196 _initialized = True
197 logger.info(f"Experimentation integration test: gate 'test_growthbook_integration' = {test_gate_result}")
200def _record_exposure(user_id: int, experiment: Experiment, result: Result, **_: Any) -> None:
201 data = {
202 "experiment_name": experiment.name,
203 "variation_key": result.key,
204 "variation_name": result.name,
205 "hash_attribute": result.hashAttribute,
206 "hash_value": result.hashValue,
207 "bucket": result.bucket,
208 "in_experiment": result.inExperiment,
209 "hash_used": result.hashUsed,
210 "sticky_bucket_used": result.stickyBucketUsed,
211 "feature_id": result.featureId,
212 }
213 stmt = (
214 insert(ExperimentExposure)
215 .values(
216 user_id=user_id,
217 experiment_key=experiment.key,
218 variation_id=result.variationId,
219 source=ExposureSource.backend,
220 data=data,
221 )
222 .on_conflict_do_nothing(constraint="uq_experiment_exposures_user_exp_var")
223 )
224 with session_scope() as session:
225 session.execute(stmt)
228def _record_feature_usage(user_id: int, key: str, result: FeatureResult, **_: Any) -> None:
229 with session_scope() as session:
230 session.add(FeatureUsage(user_id=user_id, feature_key=key, value=result.value))
233def _create_evaluator(user_id: int | None) -> GrowthBook:
234 """
235 Build a per-request GrowthBook evaluator over the current feature snapshot.
237 Pass user_id=None for an anonymous (logged-out) evaluation: with no `id` attribute GrowthBook
238 can't bucket the user, so experiments and percentage rollouts are skipped and flags fall
239 through to their defaults. No exposure or usage is recorded without a user.
241 Reads the in-memory snapshot maintained by the background refresh thread - never does HTTP
242 from the request path. Constructing without `client_key` keeps the GrowthBook a pure
243 evaluator: no callback registration on the library's process-wide singleton. The caller is
244 responsible for caching this for the lifetime of a request.
245 """
246 if not _initialized: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 raise ExperimentationNotInitializedError(
248 "Experimentation is not initialized - call setup_experimentation() first"
249 )
250 with _state_lock:
251 features = _state["features"]
252 saved_groups = _state["savedGroups"]
254 def on_experiment_viewed(experiment: Experiment, result: Result, **kwargs: Any) -> None:
255 if user_id is not None: 255 ↛ exitline 255 didn't return from function 'on_experiment_viewed' because the condition on line 255 was always true
256 _record_exposure(user_id, experiment, result)
258 def on_feature_usage(key: str, result: FeatureResult, *args: Any, **kwargs: Any) -> None:
259 if user_id is not None:
260 _record_feature_usage(user_id, key, result)
262 return GrowthBook(
263 attributes={"id": str(user_id)} if user_id is not None else {},
264 features=features,
265 savedGroups=saved_groups,
266 on_experiment_viewed=on_experiment_viewed,
267 on_feature_usage=on_feature_usage,
268 )
271def _global_evaluator() -> GrowthBook:
272 """Build an anonymous evaluator for flag evaluation with no user/request context."""
273 return _create_evaluator(None)
276# Single home of the gating logic, shared by the global functions below and by CouchersContext (which
277# passes its own cached per-request evaluator). get_evaluator stays lazy - skipped in file-override mode.
278def _feature_value[T](flag_key: str, default: T, get_evaluator: Callable[[], GrowthBook]) -> T:
279 if config.FEATURE_FLAGS_FILE_OVERRIDE_PATH:
280 return _load_local_flags(config.FEATURE_FLAGS_FILE_OVERRIDE_PATH).get(flag_key, default) # type: ignore[no-any-return]
281 result = get_evaluator().eval_feature(flag_key)
282 value = default if result.value is None else result.value
283 metrics.observe_feature_flag_evaluation(flag_key, result.source, value)
284 return value
287# Global (no-user) flag evaluation. Use these ONLY when there is genuinely no user to evaluate for and
288# no way to thread one through - per-user evaluation is impossible here, not merely that you don't
289# expect the value to vary per user. If a user is (or could reasonably be) available, use the
290# CouchersContext methods instead: only the per-user path does percentage rollouts, experiments, and
291# feature-usage tracking. With no user to bucket, rollouts and experiments are skipped and flags fall
292# through to their in-code defaults unless a rule forces a value globally.
293def get_global_boolean_value(flag_key: str, default: bool) -> bool:
294 return _feature_value(flag_key, default, _global_evaluator)
297def get_global_string_value(flag_key: str, default: str) -> str:
298 return _feature_value(flag_key, default, _global_evaluator)
301def get_global_integer_value(flag_key: str, default: int) -> int:
302 return _feature_value(flag_key, default, _global_evaluator)
305def get_global_float_value(flag_key: str, default: float) -> float:
306 return _feature_value(flag_key, default, _global_evaluator)
309def get_global_object_value[T](flag_key: str, default: T) -> T:
310 return _feature_value(flag_key, default, _global_evaluator)