DevOpsJanuary 31, 2026 11 min read

ETL Pipeline Monitoring: Detect Silent Failures

Monitor your ETL pipelines with heartbeat checks. Get alerts when data pipelines fail, run too long, or produce unexpected results.

WizStatus Team
Author

ETL (Extract, Transform, Load) pipelines are critical for data-driven organizations. When they fail silently, downstream reports, dashboards, and decisions suffer. Here's how to implement comprehensive monitoring for your data pipelines.

The ETL Monitoring Challenge

ETL failures are particularly insidious because:

  • Data appears normal - Old data still shows in reports
  • No immediate errors - Users don't see error pages
  • Delayed discovery - Often found days later
  • Cascading impact - Analytics, ML models, reports all affected
  • Complex dependencies - Multiple stages can fail independently

What to Monitor in ETL

Pipeline Completion

Did the entire pipeline finish?

def run_pipeline():
    extract()
    transform()
    load()
    # Signal completion
    requests.get(PING_URL)

Stage Completion

Monitor each stage independently:

def run_pipeline():
    extract()
    requests.get(f"{PING_URL}/extract")

    transform()
    requests.get(f"{PING_URL}/transform")

    load()
    requests.get(f"{PING_URL}/load")

Record Counts

Alert if fewer records than expected:

def run_pipeline():
    records = extract()

    if len(records) < MINIMUM_EXPECTED:
        # Don't ping - suspicious data
        alert(f"Only {len(records)} records extracted")
        return

    transform(records)
    load(records)
    requests.get(PING_URL)

Pipeline Duration

Alert if taking too long:

import time

def run_pipeline():
    start = time.time()
    extract()
    transform()
    load()
    duration = time.time() - start

    requests.get(f"{PING_URL}?duration={duration}")

    if duration > MAX_DURATION:
        alert(f"Pipeline took {duration}s")

Monitoring Different ETL Tools

Apache Airflow

Use Airflow callbacks:

from airflow import DAG
from airflow.operators.python import PythonOperator
import requests

def notify_success(context):
    requests.get("https://wizstatus.com/ping/TOKEN")

def notify_failure(context):
    requests.get("https://wizstatus.com/ping/TOKEN/fail")

dag = DAG(
    'daily_etl',
    schedule_interval='0 2 * * *',
    on_success_callback=notify_success,
    on_failure_callback=notify_failure
)

Per-task monitoring:

extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
    on_success_callback=lambda ctx: requests.get(PING_EXTRACT),
    dag=dag
)

dbt (Data Build Tool)

Use dbt hooks:

# dbt_project.yml
on-run-end:
  - "{{ run_query('SELECT 1') }}"

Or wrap dbt commands:

#!/bin/bash
dbt run --target prod

if [ $? -eq 0 ]; then
  curl https://wizstatus.com/ping/dbt-run
fi

Luigi

Use event handlers:

import luigi
import requests

@luigi.Task.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    if isinstance(task, FinalTask):
        requests.get("https://wizstatus.com/ping/TOKEN")

@luigi.Task.event_handler(luigi.Event.FAILURE)
def on_failure(task, exception):
    # Failure will be caught by missing ping
    pass

Prefect

Use state handlers:

from prefect import flow
import requests

@flow(on_completion=[notify_complete])
def etl_pipeline():
    extract()
    transform()
    load()

def notify_complete(flow, flow_run, state):
    if state.is_completed():
        requests.get("https://wizstatus.com/ping/TOKEN")

Dagster

Use sensors and hooks:

from dagster import success_hook
import requests

@success_hook
def heartbeat_hook(context):
    requests.get("https://wizstatus.com/ping/TOKEN")

@job(hooks={heartbeat_hook})
def etl_job():
    load(transform(extract()))

Pipeline Quality Checks

Beyond completion, verify data quality:

Row Count Validation

def validate_and_ping():
    count = db.execute("SELECT COUNT(*) FROM target_table")[0][0]

    if count < MINIMUM_ROWS:
        alert(f"Only {count} rows in target")
        return

    requests.get(PING_URL)

Freshness Check

def validate_freshness():
    latest = db.execute("""
        SELECT MAX(updated_at) FROM target_table
    """)[0][0]

    if latest < datetime.now() - timedelta(hours=1):
        alert("Data is stale")
        return

    requests.get(PING_URL)

Schema Validation

def validate_schema():
    expected_columns = {'id', 'name', 'value', 'timestamp'}
    actual_columns = set(get_table_columns('target'))

    if expected_columns != actual_columns:
        alert(f"Schema mismatch: {expected_columns - actual_columns}")
        return

    requests.get(PING_URL)

Monitoring Architecture

Simple Pipeline

Extract → Transform → Load → Ping

One monitor, one ping at the end.

Multi-Stage Pipeline

Extract → Ping/extract
    ↓
Transform → Ping/transform
    ↓
Load → Ping/load
    ↓
Validate → Ping/complete

Multiple monitors, one per stage.

Parallel Pipeline

Source A ───Extract─→ Transform ┐
                                ├─→ Load → Ping
Source B ───Extract─→ Transform ┘

Ping only when all branches complete.

Setting Up Monitors

Daily Pipeline at 2 AM

  • Schedule: Daily at 02:00
  • Grace period: 2-3 hours (depending on data volume)
  • Alerts: Email + Slack

Hourly Incremental

  • Schedule: Every hour
  • Grace period: 30 minutes
  • Alerts: Slack only (avoid alert fatigue)

Real-Time Streaming

For streaming pipelines, ping periodically:

  • Schedule: Every 5 minutes
  • Grace period: 10 minutes
  • Alerts: PagerDuty (critical)

Error Handling

Partial Success

def run_with_partial_success():
    results = []
    for source in sources:
        try:
            results.append(process(source))
        except Exception as e:
            log_error(source, e)

    if len(results) > len(sources) * 0.9:  # 90% success
        requests.get(PING_URL)
    else:
        # Too many failures - don't ping
        alert(f"Only {len(results)}/{len(sources)} succeeded")

Retry Logic

from tenacity import retry, stop_after_attempt

@retry(stop=stop_after_attempt(3))
def run_pipeline():
    extract()
    transform()
    load()
    requests.get(PING_URL, timeout=30)

Dashboard Integration

Send metrics with your ping:

def run_pipeline():
    start = time.time()
    records = extract()
    transform(records)
    load(records)
    duration = time.time() - start

    requests.get(PING_URL, params={
        'records': len(records),
        'duration': int(duration),
        'source': 'production'
    })

Best Practices

  1. Monitor end-to-end - Don't just ping on extract
  2. Include validation - Ping only after quality checks
  3. Track duration - Alert on unexpected slowness
  4. Monitor record counts - Empty pipelines shouldn't ping
  5. Separate environments - Different monitors for prod/staging
  6. Document dependencies - Know what relies on each pipeline

Checklist

  • All critical pipelines have monitors
  • Grace periods account for maximum run time
  • Quality validation before pinging
  • Record count thresholds defined
  • Duration thresholds defined
  • Staging and production separated
  • Alert channels configured appropriately
  • Pipeline documentation updated
Never let your ETL pipelines fail silently. Set up heartbeat monitoring with WizStatus and get alerts when data doesn't flow as expected.

Related Articles

How to Monitor Backup Jobs and Get Alerts on Failure
Best Practices

How to Monitor Backup Jobs and Get Alerts on Failure

Set up reliable monitoring for your database and file backups. Get instant alerts when backup jobs fail, run too long, or don't run at all.
10 min read
How to Monitor Cron Jobs: Step-by-Step Guide
Tutorials

How to Monitor Cron Jobs: Step-by-Step Guide

Learn how to set up monitoring for your cron jobs. Get alerts when scheduled tasks fail, run too long, or don't run at all.
10 min read
Dead Man's Switch: Ensure Critical Jobs Never Fail Silently
Monitoring

Dead Man's Switch: Ensure Critical Jobs Never Fail Silently

Understand dead man's switch monitoring for critical systems. Learn how to implement fail-safe alerting for jobs that must run reliably.
9 min read

Start monitoring your infrastructure today

Put these insights into practice with WizStatus monitoring.

Try WizStatus Free