Pipeline Workflow#
CausalPy provides a composable pipeline API that chains causal inference steps into a single, reproducible workflow. Instead of manually calling experiment construction, sensitivity analysis, and report generation separately, you can define them as steps in a pipeline.
import pandas as pd
import causalpy as cp
Manual approach (before pipeline)#
Traditionally, a CausalPy analysis involves several sequential steps:
df = (
cp.load_data("its")
.assign(date=lambda x: pd.to_datetime(x["date"]))
.set_index("date")
)
treatment_time = pd.to_datetime("2017-01-01")
seed = 42
model = cp.pymc_models.LinearRegression(sample_kwargs={"random_seed": seed})
# Step 1: Fit the experiment
result = cp.InterruptedTimeSeries(
df,
treatment_time,
formula="y ~ 1 + t",
model=model,
)
# Step 2: Get effect summary
summary = result.effect_summary()
print(summary.text)
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [beta, y_hat_sigma]
Sampling 4 chains for 1_000 tune and 1_000 draw iterations (4_000 + 4_000 draws total) took 1 seconds.
Sampling: [beta, y_hat, y_hat_sigma]
Sampling: [y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Post-period (2017-01-31 00:00:00 to 2019-12-31 00:00:00), the average effect was 0.99 (95% HDI [-3.30, 5.42]), with a posterior probability of an increase of 0.667. The cumulative effect was 35.47 (95% HDI [-118.92, 195.03]); probability of an increase 0.667. Relative to the counterfactual, this equals 1.92% on average (95% HDI [-5.68%, 10.24%]).
Pipeline approach#
The pipeline wraps these steps into a single, declarative workflow. Each step is configured upfront, and the pipeline validates everything before running.
df = (
cp.load_data("its")
.assign(date=lambda x: pd.to_datetime(x["date"]))
.set_index("date")
)
result = cp.Pipeline(
data=df,
steps=[
cp.EstimateEffect(
method=cp.InterruptedTimeSeries,
treatment_time=pd.to_datetime("2017-01-01"),
formula="y ~ 1 + t",
model=cp.pymc_models.LinearRegression(sample_kwargs={"random_seed": 42}),
),
cp.GenerateReport(include_plots=False),
],
).run()
print("Experiment type:", type(result.experiment).__name__)
print("Effect summary available:", result.effect_summary is not None)
print("Report generated:", result.report is not None)
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [beta, y_hat_sigma]
Sampling 4 chains for 1_000 tune and 1_000 draw iterations (4_000 + 4_000 draws total) took 1 seconds.
Sampling: [beta, y_hat, y_hat_sigma]
Sampling: [y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Experiment type: InterruptedTimeSeries
Effect summary available: True
Report generated: True
Adding sensitivity analysis#
The SensitivityAnalysis step runs a suite of diagnostic checks against the fitted experiment. Checks are pluggable, and you can choose which ones to run.
result = cp.Pipeline(
data=df,
steps=[
cp.EstimateEffect(
method=cp.InterruptedTimeSeries,
treatment_time=pd.to_datetime("2017-01-01"),
formula="y ~ 1 + t",
model=cp.pymc_models.LinearRegression(sample_kwargs={"random_seed": 42}),
),
cp.SensitivityAnalysis(
checks=[
cp.checks.PlaceboInTime(n_folds=2),
]
),
cp.GenerateReport(include_plots=True),
],
).run()
print(f"Sensitivity checks run: {len(result.sensitivity_results)}")
for check_result in result.sensitivity_results:
print(f" - {check_result.check_name}: {check_result.text[:80]}...")
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [beta, y_hat_sigma]
Sampling 4 chains for 1_000 tune and 1_000 draw iterations (4_000 + 4_000 draws total) took 1 seconds.
Sampling: [beta, y_hat, y_hat_sigma]
Sampling: [y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Fold 1 failed to fit (pseudo_treatment_time=2011-01-05 00:00:00), skipping.
Fold 2 failed to fit (pseudo_treatment_time=2014-01-03 00:00:00), skipping.
Sensitivity checks run: 1
- PlaceboInTime: Placebo-in-time analysis: 0 of 2 folds completed (2 skipped).
INCONCLUSIVE — no ...
Available checks#
CausalPy provides a range of sensitivity checks, each applicable to specific experiment types:
Check |
Applicable methods |
Description |
|---|---|---|
|
ITS, SC |
Shifts treatment time backward to test for spurious effects |
|
All Bayesian |
Re-fits with different priors |
|
SC |
Validates treated values are within control range |
|
ITS (3-period) |
Checks if effects persist after intervention ends |
|
Staggered DiD |
Validates parallel trends via pre-treatment effects |
|
RD, RKink |
Re-fits with multiple bandwidths |
|
SC |
Drops each control unit and refits |
|
SC |
Treats each control as placebo treated |
|
RD |
Tests for running variable manipulation |
Pipeline result#
The PipelineResult object contains all accumulated outputs:
print("result.experiment ->", type(result.experiment).__name__)
print("result.effect_summary ->", type(result.effect_summary).__name__)
print("result.sensitivity_results ->", len(result.sensitivity_results), "checks")
print("result.report ->", "HTML" if result.report else "None")
result.experiment -> InterruptedTimeSeries
result.effect_summary -> EffectSummary
result.sensitivity_results -> 1 checks
result.report -> HTML
The effect summary provides both a table and prose:
if result.effect_summary is not None:
print(result.effect_summary.text)
display(result.effect_summary.table)
Post-period (2017-01-31 00:00:00 to 2019-12-31 00:00:00), the average effect was 0.99 (95% HDI [-3.30, 5.42]), with a posterior probability of an increase of 0.667. The cumulative effect was 35.47 (95% HDI [-118.92, 195.03]); probability of an increase 0.667. Relative to the counterfactual, this equals 1.92% on average (95% HDI [-5.68%, 10.24%]).
| mean | median | hdi_lower | hdi_upper | p_gt_0 | relative_mean | relative_hdi_lower | relative_hdi_upper | |
|---|---|---|---|---|---|---|---|---|
| average | 0.985237 | 0.998458 | -3.303218 | 5.417612 | 0.66725 | 1.919774 | -5.684154 | 10.243178 |
| cumulative | 35.468549 | 35.944489 | -118.915839 | 195.034044 | 0.66725 | 1.919774 | -5.684154 | 10.243178 |
Viewing the HTML report#
The GenerateReport step renders the full analysis — effect summary, diagnostic plots, and sensitivity-check results — into a single HTML report stored in result.report. See the report generation notebook for the standalone (non-pipeline) API.
Show code cell source
import html as html_module
import warnings
from IPython.display import HTML
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", "Consider using IPython.display.IFrame", UserWarning
)
report_widget = HTML(
'<iframe srcdoc="'
+ html_module.escape(result.report)
+ '" width="100%" height="1000"'
' style="border: 1px solid #ddd; border-radius: 6px;"></iframe>'
)
report_widget