Source code for causalpy.checks.placebo_in_time

#   Copyright 2022 - 2026 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""
Placebo-in-time sensitivity check with hierarchical null model.

Builds a hierarchical Bayesian model of the "status quo" (no-effect)
distribution from placebo folds, then compares the actual intervention
effect against that learned null.  Optionally computes Bayesian
assurance (operating characteristics) against a user-supplied
expected-effect prior.

Supports experiments with a ``treatment_time`` parameter
(InterruptedTimeSeries, SyntheticControl).  Requires a PyMC model
for posterior extraction.
"""

from __future__ import annotations

import logging
from dataclasses import dataclass, field
from typing import Any

import numpy as np
import pandas as pd
import pymc as pm
import xarray as xr

from causalpy.checks.base import CheckResult, clone_model
from causalpy.experiments.base import BaseExperiment
from causalpy.experiments.interrupted_time_series import InterruptedTimeSeries
from causalpy.experiments.synthetic_control import SyntheticControl
from causalpy.pipeline import PipelineContext
from causalpy.pymc_models import PyMCModel

logger = logging.getLogger(__name__)

MIN_FOLD_OBSERVATIONS = 3

_DEFAULT_SAMPLE_KWARGS: dict[str, Any] = {
    "draws": 1000,
    "chains": 4,
    "target_accept": 0.97,
}


[docs] @dataclass class PlaceboFoldResult: """Result of a single placebo fold. Attributes ---------- fold : int Fold number (1-indexed). pseudo_treatment_time : Any The shifted treatment time for this fold. experiment : BaseExperiment The fitted experiment for this fold. cumulative_impact_samples : xr.DataArray Posterior samples of the cumulative (summed) impact for this fold. fold_mean : float Posterior mean of the cumulative impact. fold_sd : float Posterior standard deviation of the cumulative impact. """ fold: int pseudo_treatment_time: Any experiment: BaseExperiment cumulative_impact_samples: xr.DataArray fold_mean: float fold_sd: float
[docs] @dataclass class AssuranceResult: """Bayesian operating characteristics from design-level simulation. Attributes ---------- true_positive_rate : float P(decide "positive" | alternative true). This *is* the assurance. false_positive_rate : float P(decide "positive" | null true). true_negative_rate : float P(decide "null" | null true). false_negative_rate : float P(decide "null" | alternative true). null_indeterminate_rate : float P(decide "indeterminate" | null true). alt_indeterminate_rate : float P(decide "indeterminate" | alternative true). null_decisions : np.ndarray Raw decision strings under the null scenario. alt_decisions : np.ndarray Raw decision strings under the alternative scenario. """ true_positive_rate: float false_positive_rate: float true_negative_rate: float false_negative_rate: float null_indeterminate_rate: float alt_indeterminate_rate: float null_decisions: np.ndarray = field(repr=False) alt_decisions: np.ndarray = field(repr=False)
[docs] class PlaceboInTime: """Placebo-in-time sensitivity check with hierarchical null model. Shifts the treatment time backward into the pre-intervention period to create ``n_folds`` placebo experiments. Extracts the posterior cumulative impact from each fold, then fits a hierarchical Bayesian model to characterise the "status quo" distribution of effects when no intervention occurred. The actual intervention's cumulative effect is compared against this learned null. When ``expected_effect_prior`` and ``rope_half_width`` are provided, additionally computes Bayesian assurance (operating characteristics) via simulation. Parameters ---------- n_folds : int, default 3 Number of placebo folds to create. Must be >= 1. experiment_factory : callable, optional Custom factory ``(data, treatment_time) -> BaseExperiment``. If ``None`` (default), the factory is derived from the pipeline's ``experiment_config``. Required for standalone (non-pipeline) use. sample_kwargs : dict, optional MCMC settings for the hierarchical status-quo model. Defaults to ``{"draws": 1000, "chains": 4, "target_accept": 0.97}``. threshold : float, default 0.95 Probability cutoff. Used both for ``passed`` (P(actual effect outside null) must exceed this) and for the ROPE decision rule when computing assurance. prior_scale : float, default 1.0 Multiplier for auto-computed prior widths on the hierarchical model. The priors are ``mu ~ Normal(center, 5 * prior_scale * data_scale)`` and ``tau ~ HalfNormal(2 * prior_scale * data_scale)``. expected_effect_prior : distribution or array, optional Prior belief about the true total effect under the alternative hypothesis. Accepts any object with an ``.rvs(n)`` method (PreliZ, scipy) or a numpy array of pre-drawn samples. When provided together with ``rope_half_width``, assurance analysis runs automatically. rope_half_width : float, optional Half-width of the ROPE interval ``[-rope, +rope]``. Required when ``expected_effect_prior`` is provided. n_design_replications : int, optional Number of simulation replications for assurance. Defaults to ``min(theta_new.size, expected_effect_samples.size)``. random_seed : int, optional RNG seed for the assurance simulation. Examples -------- >>> import causalpy as cp # doctest: +SKIP >>> check = cp.checks.PlaceboInTime(n_folds=3) # doctest: +SKIP """ applicable_methods: set[type[BaseExperiment]] = { InterruptedTimeSeries, SyntheticControl, }
[docs] def __init__( self, n_folds: int = 3, experiment_factory: Any | None = None, sample_kwargs: dict[str, Any] | None = None, threshold: float = 0.95, prior_scale: float = 1.0, expected_effect_prior: Any | None = None, rope_half_width: float | None = None, n_design_replications: int | None = None, random_seed: int | None = None, ) -> None: if n_folds < 1: raise ValueError("n_folds must be >= 1") if expected_effect_prior is not None and rope_half_width is None: raise ValueError( "rope_half_width is required when expected_effect_prior is " "provided. Specify the ROPE half-width that defines " "practical significance." ) self.n_folds = n_folds self.experiment_factory = experiment_factory self.sample_kwargs = {**_DEFAULT_SAMPLE_KWARGS, **(sample_kwargs or {})} self.threshold = threshold self.prior_scale = prior_scale self.expected_effect_prior = expected_effect_prior self.rope_half_width = rope_half_width self.n_design_replications = n_design_replications self.random_seed = random_seed
# ------------------------------------------------------------------ # Validation # ------------------------------------------------------------------
[docs] def validate(self, experiment: BaseExperiment) -> None: """Check the experiment is compatible with PlaceboInTime. Raises ------ TypeError If the experiment lacks ``treatment_time`` or uses a non-PyMC model. """ if not hasattr(experiment, "treatment_time"): raise TypeError( f"{type(experiment).__name__} does not have a treatment_time " f"attribute. PlaceboInTime requires experiments with an " f"explicit treatment time." ) if not isinstance(experiment.model, PyMCModel): raise TypeError( f"PlaceboInTime requires a PyMC model for posterior " f"extraction, but got {type(experiment.model).__name__}. " f"Use a PyMC model (e.g. cp.pymc_models.LinearRegression)." )
# ------------------------------------------------------------------ # Factory helpers (reused from original) # ------------------------------------------------------------------ def _get_factory(self, context: PipelineContext | None) -> Any: """Return a factory ``(data, treatment_time) -> experiment``.""" if self.experiment_factory is not None: return self.experiment_factory if context is None or context.experiment_config is None: raise RuntimeError( "No experiment_config in context and no experiment_factory " "provided. Use EstimateEffect before SensitivityAnalysis, " "or pass an explicit experiment_factory to PlaceboInTime." ) config = context.experiment_config method = config["method"] kwargs = {k: v for k, v in config.items() if k != "method"} def _factory(data: pd.DataFrame, treatment_time: Any) -> BaseExperiment: """Create a fresh experiment with the given treatment time.""" kw = dict(kwargs) kw["treatment_time"] = treatment_time if "model" in kw and kw["model"] is not None: kw["model"] = clone_model(kw["model"]) return method(data, **kw) return _factory # ------------------------------------------------------------------ # Fold geometry (reused from original) # ------------------------------------------------------------------ def _compute_intervention_length(self, experiment: BaseExperiment) -> Any: """Compute intervention length from the experiment.""" treatment_time = experiment.treatment_time # type: ignore[attr-defined] data = experiment.data # type: ignore[attr-defined] treatment_end = getattr(experiment, "treatment_end_time", None) if treatment_end is not None: return treatment_end - treatment_time if hasattr(data, "index"): return data.index.max() - treatment_time raise ValueError("Cannot determine intervention length from experiment.") def _compute_fold_treatment_times( self, treatment_time: Any, intervention_length: Any ) -> list[Any]: """Compute pseudo-treatment times for each fold.""" return [ treatment_time - (self.n_folds - fold) * intervention_length for fold in range(self.n_folds) ] def _get_fold_data( self, data: pd.DataFrame, pseudo_treatment_time: Any, intervention_length: Any, ) -> pd.DataFrame: """Extract data up to the end of the placebo intervention window.""" pseudo_end = pseudo_treatment_time + intervention_length return data.loc[data.index < pseudo_end].copy() # ------------------------------------------------------------------ # Posterior extraction # ------------------------------------------------------------------ @staticmethod def _extract_cumulative_impact(experiment: BaseExperiment) -> xr.DataArray: """Extract posterior cumulative impact from a fitted experiment. Returns an ``xr.DataArray`` with a single ``sample`` dimension obtained by summing over ``obs_ind`` and stacking ``(chain, draw)``. """ post_impact = experiment.post_impact # type: ignore[attr-defined] if "treated_units" in post_impact.dims: post_impact = post_impact.isel(treated_units=0) cumulative = post_impact.sum("obs_ind") return cumulative.stack(sample=("chain", "draw")) # ------------------------------------------------------------------ # Hierarchical status-quo model # ------------------------------------------------------------------ def _build_status_quo_model( self, fold_means: np.ndarray, fold_sds: np.ndarray, ) -> tuple[Any, np.ndarray]: """Fit the hierarchical status-quo model and return theta_new. Parameters ---------- fold_means : np.ndarray Per-fold posterior means of cumulative impact. fold_sds : np.ndarray Per-fold posterior SDs of cumulative impact. Returns ------- tuple[InferenceData, np.ndarray] ``(idata, theta_new_samples)`` where ``theta_new_samples`` are draws from the posterior predictive for a new null period. """ n_folds = len(fold_means) fold_sds = np.where(fold_sds < 1e-6, 1e-6, fold_sds) prior_mu_center = float(np.nanmean(fold_means)) prior_mu_scale = float(np.nanstd(fold_means)) if prior_mu_scale <= 0.0: prior_mu_scale = 1.0 scale = self.prior_scale coords = {"fold": np.arange(n_folds)} with pm.Model(coords=coords) as model: observed_fold_means = pm.Data( "observed_fold_means", fold_means, dims="fold" ) observed_fold_sd = pm.Data("observed_fold_sd", fold_sds, dims="fold") mu_status_quo = pm.Normal( "mu_status_quo", mu=prior_mu_center, sigma=5.0 * scale * prior_mu_scale, ) tau_status_quo = pm.HalfNormal( "tau_status_quo", sigma=2.0 * scale * prior_mu_scale, ) fold_z = pm.Normal("fold_z", mu=0.0, sigma=1.0, dims="fold") fold_true_effect = pm.Deterministic( "fold_true_effect", mu_status_quo + tau_status_quo * fold_z, dims="fold", ) pm.Normal( "likelihood_fold_means", mu=fold_true_effect, sigma=observed_fold_sd, observed=observed_fold_means, dims="fold", ) idata = pm.sample(**self.sample_kwargs) with model: model.add_coords({"new_period": np.arange(1)}) pm.Normal( "theta_new", mu=mu_status_quo, sigma=tau_status_quo, dims="new_period", ) pp = pm.sample_posterior_predictive(idata, var_names=["theta_new"]) theta_new_samples = ( pp["posterior_predictive"]["theta_new"] .stack(sample=("chain", "draw")) .values.squeeze() ) return idata, theta_new_samples # ------------------------------------------------------------------ # ROPE decision rule # ------------------------------------------------------------------
[docs] @staticmethod def bayesian_rope_decision( posterior_samples: np.ndarray, rope_half_width: float, threshold: float, ) -> str: """Apply a ROPE-based Bayesian decision rule. Parameters ---------- posterior_samples : np.ndarray Posterior draws of the total effect. rope_half_width : float Half-width of the ROPE interval ``[-rope, +rope]``. threshold : float Minimum posterior probability required to make a decision. Returns ------- str One of ``"positive"``, ``"null"``, or ``"indeterminate"``. """ samples = np.asarray(posterior_samples).ravel() prob_positive = float((samples > rope_half_width).mean()) prob_null = float((np.abs(samples) <= rope_half_width).mean()) if prob_positive >= threshold: return "positive" elif prob_null >= threshold: return "null" else: return "indeterminate"
# ------------------------------------------------------------------ # Assurance (operating characteristics) # ------------------------------------------------------------------ def _draw_expected_effect_samples(self, n: int) -> np.ndarray: """Draw samples from the expected-effect prior.""" prior = self.expected_effect_prior if prior is None: raise ValueError("expected_effect_prior is not set.") if isinstance(prior, np.ndarray): return prior if hasattr(prior, "rvs"): return np.asarray(prior.rvs(n)) # type: ignore[union-attr] raise TypeError( f"expected_effect_prior must be a numpy array or have an " f".rvs(n) method, got {type(prior).__name__}." ) def _compute_assurance( self, theta_new_samples: np.ndarray, fold_sds: np.ndarray, n_posterior_samples: int, ) -> AssuranceResult: """Simulate decisions under null and alternative to get assurance. Parameters ---------- theta_new_samples : np.ndarray Draws from the status-quo posterior predictive. fold_sds : np.ndarray Per-fold posterior SDs (used to simulate estimation noise). n_posterior_samples : int Number of posterior draws to simulate per replication. Returns ------- AssuranceResult """ expected_samples = self._draw_expected_effect_samples(len(theta_new_samples)) n_reps = self.n_design_replications if n_reps is None: n_reps = min(len(theta_new_samples), len(expected_samples)) rng = np.random.default_rng(self.random_seed) rope = self.rope_half_width if rope is None: raise ValueError( "rope_half_width must be set for assurance." ) # pragma: no cover null_decisions: list[str] = [] for i in range(n_reps): true_effect = float(theta_new_samples[i % len(theta_new_samples)]) sigma = float(rng.choice(fold_sds)) simulated_posterior = rng.normal( loc=true_effect, scale=sigma, size=n_posterior_samples ) null_decisions.append( self.bayesian_rope_decision(simulated_posterior, rope, self.threshold) ) alt_decisions: list[str] = [] for i in range(n_reps): true_effect = float(expected_samples[i % len(expected_samples)]) sigma = float(rng.choice(fold_sds)) simulated_posterior = rng.normal( loc=true_effect, scale=sigma, size=n_posterior_samples ) alt_decisions.append( self.bayesian_rope_decision(simulated_posterior, rope, self.threshold) ) null_arr = np.array(null_decisions) alt_arr = np.array(alt_decisions) return AssuranceResult( true_positive_rate=float((alt_arr == "positive").mean()), false_positive_rate=float((null_arr == "positive").mean()), true_negative_rate=float((null_arr == "null").mean()), false_negative_rate=float((alt_arr == "null").mean()), null_indeterminate_rate=float((null_arr == "indeterminate").mean()), alt_indeterminate_rate=float((alt_arr == "indeterminate").mean()), null_decisions=null_arr, alt_decisions=alt_arr, ) # ------------------------------------------------------------------ # Main entry point # ------------------------------------------------------------------
[docs] def run( self, experiment: BaseExperiment, context: PipelineContext | None = None, ) -> CheckResult: """Run placebo-in-time analysis with hierarchical null model. Creates ``n_folds`` placebo experiments by shifting the treatment time backward. Extracts posterior cumulative impact from each fold, then fits a hierarchical Bayesian model to characterise the status-quo distribution. Compares the actual intervention effect against this null. When ``expected_effect_prior`` was provided at construction, also runs Bayesian assurance simulation. Can be used standalone (``context=None``) when ``experiment_factory`` was provided, or within a pipeline. Returns ------- CheckResult With ``passed`` indicating whether the actual effect is clearly outside the null distribution, and rich metadata including the null samples and optional assurance results. """ self.validate(experiment) factory = self._get_factory(context) treatment_time = experiment.treatment_time # type: ignore[attr-defined] data = experiment.data # type: ignore[attr-defined] intervention_length = self._compute_intervention_length(experiment) actual_cumulative = self._extract_cumulative_impact(experiment) actual_cumulative_mean = float(actual_cumulative.mean().values) fold_treatment_times = self._compute_fold_treatment_times( treatment_time, intervention_length ) fold_results: list[PlaceboFoldResult] = [] fold_summaries: list[str] = [] skipped_folds: list[int] = [] for fold_idx, pseudo_tt in enumerate(fold_treatment_times): fold_num = fold_idx + 1 logger.info( "PlaceboInTime fold %d/%d: pseudo_treatment_time=%s", fold_num, self.n_folds, pseudo_tt, ) fold_data = self._get_fold_data(data, pseudo_tt, intervention_length) if len(fold_data) < MIN_FOLD_OBSERVATIONS: logger.warning( "Fold %d has only %d observations (minimum %d), skipping.", fold_num, len(fold_data), MIN_FOLD_OBSERVATIONS, ) skipped_folds.append(fold_num) fold_summaries.append( f"Fold {fold_num}: SKIPPED (only {len(fold_data)} " f"observations, need >= {MIN_FOLD_OBSERVATIONS})" ) continue try: fold_experiment = factory(fold_data, pseudo_tt) cum_samples = self._extract_cumulative_impact(fold_experiment) f_mean = float(cum_samples.mean().values) f_sd = float(cum_samples.std().values) except Exception: logger.warning( "Fold %d failed to fit (pseudo_treatment_time=%s), skipping.", fold_num, pseudo_tt, exc_info=True, ) skipped_folds.append(fold_num) fold_summaries.append( f"Fold {fold_num}: SKIPPED (experiment failed to fit " f"at pseudo treatment time {pseudo_tt})" ) continue fold_result = PlaceboFoldResult( fold=fold_num, pseudo_treatment_time=pseudo_tt, experiment=fold_experiment, cumulative_impact_samples=cum_samples, fold_mean=f_mean, fold_sd=f_sd, ) fold_results.append(fold_result) fold_summaries.append( f"Fold {fold_num}: pseudo treatment at {pseudo_tt} " f"— mean={f_mean:.2f}, sd={f_sd:.2f}" ) n_completed = len(fold_results) n_skipped = len(skipped_folds) if n_completed < 1: parts = [ f"Placebo-in-time analysis: 0 folds completed ({n_skipped} skipped).", "INCONCLUSIVE — no folds completed.", ] parts.extend(fold_summaries) return CheckResult( check_name="PlaceboInTime", passed=None, text="\n".join(parts), metadata={"fold_results": fold_results}, ) fold_means = np.array([fr.fold_mean for fr in fold_results]) fold_sds = np.array([fr.fold_sd for fr in fold_results]) idata, theta_new_samples = self._build_status_quo_model(fold_means, fold_sds) p_outside = float( (np.abs(actual_cumulative_mean) > np.abs(theta_new_samples)).mean() ) passed = p_outside > self.threshold mu_post_mean = float(idata.posterior["mu_status_quo"].mean().values) tau_post_mean = float(idata.posterior["tau_status_quo"].mean().values) parts = [ f"Placebo-in-time analysis: {n_completed} of {self.n_folds} folds completed" ] if n_skipped: parts[0] += f" ({n_skipped} skipped)" parts[0] += "." parts.append( f"Hierarchical status-quo model: " f"mu={mu_post_mean:.2f}, tau={tau_post_mean:.2f}." ) parts.append( f"Actual cumulative impact: {actual_cumulative_mean:.2f}. " f"P(actual outside null) = {p_outside:.3f}." ) if passed: parts.append("SUPPORTED — actual effect is outside the null distribution.") else: parts.append( "NOT SUPPORTED — actual effect is within the null distribution." ) parts.extend(fold_summaries) text = "\n".join(parts) metadata: dict[str, Any] = { "fold_results": fold_results, "status_quo_idata": idata, "null_samples": theta_new_samples, "actual_cumulative_mean": actual_cumulative_mean, "p_effect_outside_null": p_outside, } n_posterior_samples = len(actual_cumulative.values) if self.expected_effect_prior is not None: assurance_result = self._compute_assurance( theta_new_samples, fold_sds, n_posterior_samples ) metadata["assurance_result"] = assurance_result metadata["assurance"] = assurance_result.true_positive_rate text += ( f"\n\nBayesian assurance (operating characteristics):\n" f" Under NULL (status quo true):\n" f" False Positive rate : " f"{assurance_result.false_positive_rate:.3f}\n" f" True Negative rate : " f"{assurance_result.true_negative_rate:.3f}\n" f" Indeterminate rate : " f"{assurance_result.null_indeterminate_rate:.3f}\n" f" Under ALTERNATIVE (expected effect true):\n" f" Assurance (TP rate) : " f"{assurance_result.true_positive_rate:.3f}\n" f" False Negative rate : " f"{assurance_result.false_negative_rate:.3f}\n" f" Indeterminate rate : " f"{assurance_result.alt_indeterminate_rate:.3f}" ) return CheckResult( check_name="PlaceboInTime", passed=passed, text=text, metadata=metadata, )
def __repr__(self) -> str: """Return a string representation of the check.""" parts = [f"n_folds={self.n_folds}"] if self.expected_effect_prior is not None: parts.append("assurance=True") return f"PlaceboInTime({', '.join(parts)})"