Coverage for src/couchers/experimentation.py: 31%

65 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-20 11:53 +0000

1""" 

2Experimentation framework for feature flags and experiments. 

3 

4Uses Statsig under the hood, but abstracts the implementation details. 

5 

6IMPORTANT - FORKING SAFETY: 

7The underlying SDK uses internal threading and async runtime components that 

8do NOT work correctly when copied across process boundaries during fork(). 

9Initializing before forking will cause deadlocks and unpredictable behavior. 

10 

11This module provides fork-safe initialization: 

12- Call `setup_experimentation()` ONLY in child processes AFTER forking 

13- Call `setup_experimentation()` in the main process ONLY AFTER all child processes have been spawned 

14- NEVER call setup_experimentation() at module load time 

15 

16The Couchers backend uses multiprocessing.Process for background workers, which forks. 

17The initialization flow is: 

181. app.py starts, spawns worker processes via multiprocessing.Process (fork happens) 

192. Each worker process calls `_run_forever()` which calls `setup_experimentation()` post-fork 

203. The main process (API server) calls `setup_experimentation()` after spawning workers 

21""" 

22 

23import atexit 

24import logging 

25from typing import TYPE_CHECKING 

26 

27from statsig_python_core import Statsig, StatsigOptions, StatsigUser 

28 

29from couchers.config import config 

30 

31if TYPE_CHECKING: 

32 from couchers.context import CouchersContext 

33 

34logger = logging.getLogger(__name__) 

35 

36# Track whether we've initialized in this process to prevent double-initialization 

37_initialized = False 

38 

39 

40class ExperimentationNotInitializedError(Exception): 

41 """Raised when experimentation functions are called before initialization.""" 

42 

43 

44def setup_experimentation() -> None: 

45 """ 

46 Initialize the experimentation framework. Must be called AFTER process forking. 

47 

48 This function is safe to call multiple times - subsequent calls will be no-ops. 

49 

50 Call this: 

51 - In worker processes: inside _run_forever() after db_post_fork() 

52 - In main process: after spawning all worker processes 

53 

54 IMPORTANT: Importing this module is safe before forking. Only calling this 

55 function (which initializes internal threads) must happen after forking. 

56 """ 

57 global _initialized 

58 

59 if _initialized: 

60 logger.debug("Experimentation already initialized in this process, skipping") 

61 return 

62 

63 if not config["EXPERIMENTATION_ENABLED"]: 

64 logger.info("Experimentation is disabled, skipping initialization") 

65 _initialized = True 

66 return 

67 

68 logger.info("Initializing experimentation framework") 

69 

70 options = StatsigOptions() 

71 options.environment = config["STATSIG_ENVIRONMENT"] 

72 

73 # Create the shared instance for global access 

74 statsig = Statsig.new_shared(config["STATSIG_SERVER_SECRET_KEY"], options) 

75 

76 # initialize() starts internal threads - this MUST happen after forking 

77 statsig.initialize().wait() 

78 

79 _initialized = True 

80 logger.info("Experimentation framework initialized successfully") 

81 

82 # Verify the integration works by checking a test gate 

83 test_user = StatsigUser(user_id="integration_test") 

84 test_gate_result = statsig.check_gate(test_user, "test_statsig_integration") 

85 logger.info(f"Experimentation integration test: gate 'test_statsig_integration' = {test_gate_result}") 

86 

87 atexit.register(_shutdown_experimentation) 

88 

89 

90def _shutdown_experimentation() -> None: 

91 """ 

92 Shutdown the experimentation framework, flushing any pending events. 

93 Called automatically via atexit when the process exits. 

94 """ 

95 if not config["EXPERIMENTATION_ENABLED"]: 

96 return 

97 

98 if Statsig.has_shared_instance(): 

99 logger.info("Shutting down experimentation framework") 

100 Statsig.shared().shutdown().wait() 

101 Statsig.remove_shared() 

102 

103 

104def _check_initialized() -> None: 

105 """ 

106 Check that experimentation is initialized if enabled. 

107 

108 Raises: 

109 ExperimentationNotInitializedError: If experimentation is enabled but not initialized. 

110 """ 

111 if config["EXPERIMENTATION_ENABLED"] and not _initialized: 

112 raise ExperimentationNotInitializedError( 

113 "Experimentation is not initialized - call setup_experimentation() first" 

114 ) 

115 

116 

117def _get_statsig_user(context: "CouchersContext") -> StatsigUser: 

118 """ 

119 Get or create a cached StatsigUser for the given context. 

120 

121 The StatsigUser is cached on the context to avoid recreating it for each call. 

122 """ 

123 if not hasattr(context, "_statsig_user"): 

124 context._statsig_user = StatsigUser(user_id=str(context.user_id)) # type: ignore[attr-defined] 

125 return context._statsig_user # type: ignore[attr-defined, no-any-return] 

126 

127 

128def check_gate(context: "CouchersContext", gate_name: str) -> bool: 

129 """ 

130 Check if a feature gate is enabled for the user in this context. 

131 

132 Args: 

133 context: The CouchersContext for the current request 

134 gate_name: The name of the feature gate 

135 

136 Returns: 

137 True if the gate is enabled for this user, False otherwise. 

138 Returns False if experimentation is disabled. 

139 Returns True if EXPERIMENTATION_PASS_ALL_GATES is enabled. 

140 

141 Raises: 

142 ExperimentationNotInitializedError: If experimentation is enabled but not initialized. 

143 """ 

144 _check_initialized() 

145 if config["EXPERIMENTATION_PASS_ALL_GATES"]: 

146 return True 

147 if not config["EXPERIMENTATION_ENABLED"]: 

148 return False 

149 return Statsig.shared().check_gate(_get_statsig_user(context), gate_name) 

150 

151 

152def get_experiment(context: "CouchersContext", experiment_name: str) -> dict[str, object]: 

153 """ 

154 Get experiment configuration for the user in this context. 

155 

156 Args: 

157 context: The CouchersContext for the current request 

158 experiment_name: The name of the experiment 

159 

160 Returns: 

161 A dictionary with experiment values. 

162 Returns empty dict if experimentation is disabled. 

163 

164 Raises: 

165 ExperimentationNotInitializedError: If experimentation is enabled but not initialized. 

166 """ 

167 _check_initialized() 

168 if not config["EXPERIMENTATION_ENABLED"]: 

169 return {} 

170 experiment = Statsig.shared().get_experiment(_get_statsig_user(context), experiment_name) 

171 return experiment.value if experiment else {} 

172 

173 

174def get_dynamic_config(context: "CouchersContext", config_name: str) -> dict[str, object]: 

175 """ 

176 Get dynamic config for the user in this context. 

177 

178 Args: 

179 context: The CouchersContext for the current request 

180 config_name: The name of the dynamic config 

181 

182 Returns: 

183 A dictionary with config values. 

184 Returns empty dict if experimentation is disabled. 

185 

186 Raises: 

187 ExperimentationNotInitializedError: If experimentation is enabled but not initialized. 

188 """ 

189 _check_initialized() 

190 if not config["EXPERIMENTATION_ENABLED"]: 

191 return {} 

192 dynamic_config = Statsig.shared().get_dynamic_config(_get_statsig_user(context), config_name) 

193 return dynamic_config.value if dynamic_config else {} 

194 

195 

196def log_event( 

197 context: "CouchersContext", 

198 event_name: str, 

199 value: str | float | None = None, 

200 metadata: dict[str, str] | None = None, 

201) -> None: 

202 """ 

203 Log a custom event for analytics. 

204 

205 Args: 

206 context: The CouchersContext for the current request 

207 event_name: Name of the event 

208 value: Optional value associated with the event 

209 metadata: Optional metadata dictionary 

210 

211 Raises: 

212 ExperimentationNotInitializedError: If experimentation is enabled but not initialized. 

213 """ 

214 _check_initialized() 

215 if not config["EXPERIMENTATION_ENABLED"]: 

216 return 

217 Statsig.shared().log_event( 

218 user=_get_statsig_user(context), 

219 event_name=event_name, 

220 value=value, 

221 metadata=metadata, 

222 )