AIDevOpsArchitecture

The Data Pipeline Is Lying to You

pipeline

A data pipeline that is broken loudly is a gift. The job fails. The alert fires. Someone investigates. The problem is found and fixed. The window of damage is measured in hours.

The pipelines that cause the most damage are the ones that are broken quietly. They run to completion. They produce output. The output looks reasonable. Downstream systems consume it. Models train on it. Decisions get made from it. Reports go to stakeholders. And somewhere upstream, invisible to everyone, something is wrong with the data.

I have spent more time than I would like investigating ML system degradation that turned out to be a data problem. Not a model problem. Not an infrastructure problem. The data was wrong in a way that nothing was checking for, and the wrongness had been accumulating for weeks or months before it manifested as something visible enough to investigate.

The gap between when a data problem starts and when it becomes visible is where the real damage happens. Closing that gap requires understanding where pipelines lie, how they lie, and what checks actually catch them.

How pipelines lie without failing

A pipeline lies whenever its output does not accurately represent the reality it is supposed to capture. This happens in more ways than most teams plan for.

Temporal contamination. A feature that encodes future information into a training example. The most common form: a label that was computed using data that would not have been available at prediction time. A churn model trained on a label computed thirty days after the event being predicted, where the features include activity in those same thirty days. The model learns from the future. In training it looks excellent. In production, where the future is unavailable, it performs at chance.

# The training code that looks correct:
def build_training_examples(users: list, label_date: datetime) -> list:
    examples = []
    for user in users:
        # Features computed up to label_date
        features = compute_features(user, as_of=label_date)

        # Label: did the user churn in the 30 days after label_date?
        churned = check_churn(user, after=label_date, window_days=30)

        examples.append({"features": features, "label": churned})
    return examples


# The contaminated version that is much more common:
def build_training_examples_contaminated(users: list, label_date: datetime) -> list:
    examples = []
    for user in users:
        features = compute_features(user, as_of=label_date)

        churned = check_churn(user, after=label_date, window_days=30)

        # The contamination: last_activity_date uses ALL available data,
        # not data as-of label_date.
        # For a user who churned on day 15, this captures activity from
        # the churn window that should be invisible at prediction time.
        features["last_activity_days_ago"] = (
            datetime.now() - user.last_activity_date
        ).days  # Should be: (label_date - user.last_activity_date).days

        examples.append({"features": features, "label": churned})
    return examples

This bug produces a model with suspiciously good training metrics. If your model performance looks too good, temporal contamination is the first thing to check. A model that genuinely cannot know the answer scoring near-perfect is a model that has been shown the answer.

Silent schema drift. An upstream system changes the format of a field. The pipeline does not break. The field is still present. The values are different in a way that passes type validation but is semantically wrong.

# The upstream system changed how it encodes user status.
# Before: status is an integer. 1=active, 2=inactive, 3=suspended.
# After: status is a string. "active", "inactive", "suspended".
# The pipeline reads the field. No error. The string is cast to float.
# Python: float("active") raises ValueError.
# Pandas read_csv: "active" becomes NaN silently.
# The model receives NaN where it expected 1, 2, or 3.
# The model was never trained on NaN for this feature.
# The model produces predictions. They are wrong. Nothing broke.

import pandas as pd

df = pd.read_csv("user_status_data.csv")
print(df["status"].value_counts(dropna=False))
# NaN    45231   <-- this is the contamination
# 1.0    12043
# 2.0     8921
# 3.0     2341

# The pipeline sees 45,231 rows with a valid column.
# It processes them without error.
# It logs "processed 68,536 rows successfully."
# The model trains on data where 66% of rows have NaN for user_status.

Aggregation window misalignment. A feature is defined as a rolling seven-day count. The pipeline computes it as a rolling seven- day count, but the definition of “seven days” differs between the training pipeline and the serving pipeline by a timezone offset. UTC in training, local time in serving. For users in UTC-8, the feature computed at midnight local time uses eight hours more data than the training feature used. For most users the difference is noise. For users active late at night, it systematically inflates their feature values relative to what the model was trained on.

None of these failures raise exceptions. None of them prevent the pipeline from completing. None of them produce output that fails schema validation. They are silent corruptions that flow downstream and corrupt everything that depends on them.

The checks that catch what schemas miss

Schema validation is the most common form of data quality checking and the least sufficient. Checking that a field is present, has the right type, and falls within a plausible range catches a narrow slice of the ways data can be wrong.

The checks that actually catch silent corruption are statistical.

from dataclasses import dataclass, field
from typing import Callable, Optional
import numpy as np
import pandas as pd
from scipy import stats
import logging

log = logging.getLogger(__name__)


@dataclass
class ValidationResult:
    check_name: str
    passed: bool
    severity: str  # "info", "warning", "critical"
    message: str
    details: dict = field(default_factory=dict)


class DataPipelineValidator:
    """
    Statistical validation for data pipelines.
    These checks catch what schema validation misses.
    """

    def __init__(self, reference_stats: dict):
        """
        reference_stats: statistics computed on a known-good dataset.
        Computed once during a stable period and stored.
        Every subsequent pipeline run is compared against this baseline.
        """
        self.reference = reference_stats
        self.results: list[ValidationResult] = []

    def check_null_rate(
        self,
        series: pd.Series,
        feature_name: str,
        max_allowed_null_rate: float = 0.02,
    ) -> ValidationResult:
        null_rate = series.isna().mean()
        reference_null_rate = self.reference.get(f"{feature_name}_null_rate", 0.0)

        # Two failure modes: absolute threshold and relative change
        absolute_violation = null_rate > max_allowed_null_rate
        relative_spike = (
            reference_null_rate > 0
            and null_rate > reference_null_rate * 3
        )

        passed = not (absolute_violation or relative_spike)

        return ValidationResult(
            check_name=f"null_rate:{feature_name}",
            passed=passed,
            severity="critical" if not passed else "info",
            message=(
                f"{feature_name} null rate is {null_rate:.2%}, "
                f"reference is {reference_null_rate:.2%}"
            ),
            details={
                "current_null_rate": null_rate,
                "reference_null_rate": reference_null_rate,
                "absolute_violation": absolute_violation,
                "relative_spike": relative_spike,
            },
        )

    def check_distribution_shift(
        self,
        series: pd.Series,
        feature_name: str,
        p_value_threshold: float = 0.01,
    ) -> ValidationResult:
        reference_values = self.reference.get(f"{feature_name}_sample")
        if reference_values is None:
            return ValidationResult(
                check_name=f"distribution:{feature_name}",
                passed=True,
                severity="info",
                message=f"No reference distribution for {feature_name}",
                details={},
            )

        current_values = series.dropna().values
        if len(current_values) < 100:
            return ValidationResult(
                check_name=f"distribution:{feature_name}",
                passed=True,
                severity="info",
                message=f"Insufficient samples for distribution check",
                details={"sample_size": len(current_values)},
            )

        ks_stat, p_value = stats.ks_2samp(reference_values, current_values)
        passed = p_value >= p_value_threshold

        return ValidationResult(
            check_name=f"distribution:{feature_name}",
            passed=passed,
            severity="warning" if not passed else "info",
            message=(
                f"{feature_name} distribution shifted "
                f"(KS={ks_stat:.4f}, p={p_value:.6f})"
            ),
            details={
                "ks_statistic": float(ks_stat),
                "p_value": float(p_value),
                "current_mean": float(np.mean(current_values)),
                "reference_mean": float(np.mean(reference_values)),
                "current_std": float(np.std(current_values)),
                "reference_std": float(np.std(reference_values)),
            },
        )

    def check_cardinality(
        self,
        series: pd.Series,
        feature_name: str,
        max_new_categories_pct: float = 0.05,
    ) -> ValidationResult:
        reference_categories = set(
            self.reference.get(f"{feature_name}_categories", [])
        )
        if not reference_categories:
            return ValidationResult(
                check_name=f"cardinality:{feature_name}",
                passed=True,
                severity="info",
                message=f"No reference categories for {feature_name}",
                details={},
            )

        current_categories = set(series.dropna().unique())
        new_categories = current_categories - reference_categories
        missing_categories = reference_categories - current_categories

        new_pct = len(new_categories) / max(len(reference_categories), 1)
        passed = new_pct <= max_new_categories_pct

        return ValidationResult(
            check_name=f"cardinality:{feature_name}",
            passed=passed,
            severity="warning" if not passed else "info",
            message=(
                f"{feature_name}: {len(new_categories)} new categories, "
                f"{len(missing_categories)} missing categories"
            ),
            details={
                "new_categories": list(new_categories)[:20],
                "missing_categories": list(missing_categories)[:20],
                "new_category_pct": new_pct,
            },
        )

    def check_temporal_monotonicity(
        self,
        timestamp_series: pd.Series,
        feature_name: str = "timestamp",
    ) -> ValidationResult:
        if not pd.api.types.is_datetime64_any_dtype(timestamp_series):
            timestamp_series = pd.to_datetime(timestamp_series, errors="coerce")

        is_monotonic = timestamp_series.is_monotonic_increasing
        null_timestamps = timestamp_series.isna().sum()
        future_timestamps = (
            timestamp_series > pd.Timestamp.now(tz="UTC")
        ).sum()

        passed = is_monotonic and null_timestamps == 0 and future_timestamps == 0

        return ValidationResult(
            check_name=f"temporal_monotonicity:{feature_name}",
            passed=passed,
            severity="critical" if not passed else "info",
            message=(
                f"Temporal check: monotonic={is_monotonic}, "
                f"null_timestamps={null_timestamps}, "
                f"future_timestamps={future_timestamps}"
            ),
            details={
                "is_monotonic": is_monotonic,
                "null_timestamps": int(null_timestamps),
                "future_timestamps": int(future_timestamps),
            },
        )

    def run_all_checks(self, df: pd.DataFrame) -> list[ValidationResult]:
        results = []
        for column in df.columns:
            series = df[column]
            results.append(self.check_null_rate(series, column))
            if pd.api.types.is_numeric_dtype(series):
                results.append(self.check_distribution_shift(series, column))
            elif pd.api.types.is_object_dtype(series):
                results.append(self.check_cardinality(series, column))

        timestamp_cols = [
            c for c in df.columns
            if "time" in c.lower() or "date" in c.lower() or "at" in c.lower()
        ]
        for col in timestamp_cols:
            results.append(self.check_temporal_monotonicity(df[col], col))

        critical = [r for r in results if not r.passed and r.severity == "critical"]
        warnings = [r for r in results if not r.passed and r.severity == "warning"]

        if critical:
            log.error(
                "pipeline.validation.critical_failures",
                failure_count=len(critical),
                failures=[r.check_name for r in critical],
            )
        if warnings:
            log.warning(
                "pipeline.validation.warnings",
                warning_count=len(warnings),
                warnings=[r.check_name for r in warnings],
            )

        return results

    def should_halt_pipeline(self, results: list[ValidationResult]) -> bool:
        critical_failures = [
            r for r in results if not r.passed and r.severity == "critical"
        ]
        return len(critical_failures) > 0

The key insight in this validator is the reference baseline. Every check compares the current pipeline run against statistics computed during a known-good period. This catches changes that look fine in isolation but are different from what the system expects.

A null rate of five percent might be acceptable for a particular feature. A null rate that was consistently zero percent for six months and is now five percent is not acceptable. The absolute check misses this. The relative check catches it.

The join that loses rows silently

Joins are where data pipelines lose data most quietly. A left join that should be an inner join. An inner join that drops rows when it should be a left join. A join on a key that has duplicates in one table, producing a cartesian explosion that inflates row counts. A join on a string key where whitespace differences prevent matching.

Teams write tests for the output schema of joins. They rarely write tests for the row counts.

import pandas as pd
from dataclasses import dataclass


@dataclass
class JoinAudit:
    left_rows: int
    right_rows: int
    result_rows: int
    unmatched_left: int
    unmatched_right: int
    duplicate_key_left: int
    duplicate_key_right: int


def audited_join(
    left: pd.DataFrame,
    right: pd.DataFrame,
    on: str,
    how: str = "inner",
    expected_match_rate: float = 0.95,
) -> tuple[pd.DataFrame, JoinAudit]:
    """
    Performs a join and audits the result for silent data loss.
    Raises an error if the match rate falls below the expected threshold.
    """
    left_keys = set(left[on].dropna())
    right_keys = set(right[on].dropna())

    duplicate_key_left = int(left[on].duplicated().sum())
    duplicate_key_right = int(right[on].duplicated().sum())

    result = left.merge(right, on=on, how=how)

    if how == "inner":
        unmatched_left = len(left_keys - right_keys)
        unmatched_right = len(right_keys - left_keys)
    elif how == "left":
        unmatched_left = 0
        unmatched_right = len(right_keys - left_keys)
    else:
        unmatched_left = len(left_keys - right_keys)
        unmatched_right = 0

    audit = JoinAudit(
        left_rows=len(left),
        right_rows=len(right),
        result_rows=len(result),
        unmatched_left=unmatched_left,
        unmatched_right=unmatched_right,
        duplicate_key_left=duplicate_key_left,
        duplicate_key_right=duplicate_key_right,
    )

    if how == "inner":
        match_rate = len(result) / max(len(left), 1)
        if match_rate < expected_match_rate:
            raise DataQualityError(
                f"Join on '{on}' matched {match_rate:.1%} of left rows, "
                f"expected at least {expected_match_rate:.1%}. "
                f"Unmatched left keys: {unmatched_left}. "
                f"Check for key format changes or missing data in right table."
            )

    if duplicate_key_left > 0 or duplicate_key_right > 0:
        log.warning(
            "join.duplicate_keys_detected",
            join_key=on,
            duplicate_key_left=duplicate_key_left,
            duplicate_key_right=duplicate_key_right,
            result_rows=len(result),
            expected_rows_approx=max(len(left), len(right)),
        )

    return result, audit

The match rate check is the one that catches the most real problems. A join that historically matches ninety-eight percent of rows matching seventy percent today is a signal that something upstream changed. The join succeeded. The data is wrong.

Data lineage as operational infrastructure

When a data quality problem is discovered, the first question is always: where did this data come from and when did it start being wrong?

Without data lineage tracking, answering this question requires archaeology. You trace back through pipeline code, check timestamps on intermediate outputs, look at git history for changes that might have introduced the problem, and eventually form a hypothesis that you cannot fully verify.

With data lineage tracking, the same question is a query.

import hashlib
import json
from datetime import datetime, timezone
from dataclasses import dataclass, field
from typing import Optional
import sqlite3


@dataclass
class LineageRecord:
    dataset_id: str
    dataset_name: str
    created_at: str
    pipeline_version: str
    source_datasets: list[str]
    row_count: int
    checksum: str
    validation_results: dict
    metadata: dict = field(default_factory=dict)


class DataLineageTracker:
    """
    Records the provenance of every dataset produced by every pipeline run.
    When something is wrong, you can trace it to its source.
    """

    def __init__(self, db_path: str = "lineage.db"):
        self.conn = sqlite3.connect(db_path)
        self._init_schema()

    def _init_schema(self) -> None:
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS lineage (
                dataset_id TEXT PRIMARY KEY,
                dataset_name TEXT NOT NULL,
                created_at TEXT NOT NULL,
                pipeline_version TEXT NOT NULL,
                source_datasets TEXT NOT NULL,
                row_count INTEGER NOT NULL,
                checksum TEXT NOT NULL,
                validation_passed INTEGER NOT NULL,
                validation_results TEXT NOT NULL,
                metadata TEXT NOT NULL
            )
        """)
        self.conn.commit()

    def compute_checksum(self, df) -> str:
        import pandas as pd
        sample = df.sample(min(1000, len(df)), random_state=42)
        content = sample.to_json()
        return hashlib.sha256(content.encode()).hexdigest()[:16]

    def record(
        self,
        df,
        dataset_name: str,
        pipeline_version: str,
        source_dataset_ids: list[str],
        validation_results: list,
        metadata: Optional[dict] = None,
    ) -> str:
        import uuid
        dataset_id = str(uuid.uuid4())
        created_at = datetime.now(timezone.utc).isoformat()
        checksum = self.compute_checksum(df)
        validation_passed = all(r.passed for r in validation_results)

        self.conn.execute("""
            INSERT INTO lineage VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            dataset_id,
            dataset_name,
            created_at,
            pipeline_version,
            json.dumps(source_dataset_ids),
            len(df),
            checksum,
            int(validation_passed),
            json.dumps([{
                "check": r.check_name,
                "passed": r.passed,
                "message": r.message,
            } for r in validation_results]),
            json.dumps(metadata or {}),
        ))
        self.conn.commit()

        log.info(
            "lineage.recorded",
            dataset_id=dataset_id,
            dataset_name=dataset_name,
            row_count=len(df),
            validation_passed=validation_passed,
            pipeline_version=pipeline_version,
        )

        return dataset_id

    def trace_ancestry(self, dataset_id: str, depth: int = 10) -> list[LineageRecord]:
        visited = set()
        result = []
        queue = [dataset_id]

        while queue and depth > 0:
            current_id = queue.pop(0)
            if current_id in visited:
                continue
            visited.add(current_id)

            row = self.conn.execute(
                "SELECT * FROM lineage WHERE dataset_id = ?",
                (current_id,)
            ).fetchone()

            if row is None:
                continue

            record = LineageRecord(
                dataset_id=row[0],
                dataset_name=row[1],
                created_at=row[2],
                pipeline_version=row[3],
                source_datasets=json.loads(row[4]),
                row_count=row[5],
                checksum=row[6],
                validation_results=json.loads(row[8]),
                metadata=json.loads(row[9]),
            )
            result.append(record)
            queue.extend(record.source_datasets)
            depth -= 1

        return result

    def find_first_failure(self, dataset_id: str) -> Optional[LineageRecord]:
        ancestry = self.trace_ancestry(dataset_id)
        for record in reversed(ancestry):
            failed_checks = [
                v for v in record.validation_results
                if not v.get("passed", True)
            ]
            if failed_checks:
                return record
        return None

With lineage tracking in place, investigating a data problem changes from archaeology to a query:

tracker = DataLineageTracker()

# A model is producing bad predictions from dataset "churn_features_2026_05_27"
# Find the earliest ancestor where validation failed

dataset_id = "the-dataset-id-used-for-training"
first_failure = tracker.find_first_failure(dataset_id)

if first_failure:
    print(f"First failure detected in: {first_failure.dataset_name}")
    print(f"Created at: {first_failure.created_at}")
    print(f"Pipeline version: {first_failure.pipeline_version}")
    print(f"Failed checks:")
    for check in first_failure.validation_results:
        if not check["passed"]:
            print(f"  {check['check']}: {check['message']}")

The investigation that would have taken a day takes ten minutes. More importantly, the lineage record shows the pipeline version at the time of the failure, which points directly to the code change that introduced the problem.

The pipeline contract

The practices above share a structure worth naming explicitly.

Every pipeline run should be auditable. What data went in, what data came out, how many rows, what the distributions looked like, what the validation results were. Not for compliance. For debugging. The audit trail is the thing that makes silent failures loud in retrospect.

Every pipeline should have a definition of what acceptable output looks like, expressed as executable checks, not as documentation. Documentation describing expected row counts is ignored. Code that halts the pipeline when row counts are outside expected ranges is not.

Every data problem that reaches a model is a pipeline failure, not a model failure. The model cannot defend itself against bad inputs. The pipeline can. The validation layer is the model’s immune system. Most models are running without one.

The pipeline that lies quietly is more dangerous than the pipeline that fails loudly, because the loud failure gets fixed. The quiet lie gets trained on.

Build the pipeline that tells the truth. Check that it is telling the truth on every run. Halt when it is not.

The model deserves inputs it was built to handle. Give it that, and most of what teams blame on the model will turn out to have been the pipeline all along.