ETL (Extract, Transform, Load) pipelines are critical for data-driven organizations. When they fail silently, downstream reports, dashboards, and decisions suffer. Here's how to implement comprehensive monitoring for your data pipelines.
The ETL Monitoring Challenge
ETL failures are particularly insidious because:
- Data appears normal - Old data still shows in reports
- No immediate errors - Users don't see error pages
- Delayed discovery - Often found days later
- Cascading impact - Analytics, ML models, reports all affected
- Complex dependencies - Multiple stages can fail independently
What to Monitor in ETL
Pipeline Completion
Did the entire pipeline finish?
def run_pipeline():
extract()
transform()
load()
# Signal completion
requests.get(PING_URL)
Stage Completion
Monitor each stage independently:
def run_pipeline():
extract()
requests.get(f"{PING_URL}/extract")
transform()
requests.get(f"{PING_URL}/transform")
load()
requests.get(f"{PING_URL}/load")
Record Counts
Alert if fewer records than expected:
def run_pipeline():
records = extract()
if len(records) < MINIMUM_EXPECTED:
# Don't ping - suspicious data
alert(f"Only {len(records)} records extracted")
return
transform(records)
load(records)
requests.get(PING_URL)
Pipeline Duration
Alert if taking too long:
import time
def run_pipeline():
start = time.time()
extract()
transform()
load()
duration = time.time() - start
requests.get(f"{PING_URL}?duration={duration}")
if duration > MAX_DURATION:
alert(f"Pipeline took {duration}s")
Monitoring Different ETL Tools
Apache Airflow
Use Airflow callbacks:
from airflow import DAG
from airflow.operators.python import PythonOperator
import requests
def notify_success(context):
requests.get("https://wizstatus.com/ping/TOKEN")
def notify_failure(context):
requests.get("https://wizstatus.com/ping/TOKEN/fail")
dag = DAG(
'daily_etl',
schedule_interval='0 2 * * *',
on_success_callback=notify_success,
on_failure_callback=notify_failure
)
Per-task monitoring:
extract_task = PythonOperator(
task_id='extract',
python_callable=extract_data,
on_success_callback=lambda ctx: requests.get(PING_EXTRACT),
dag=dag
)
dbt (Data Build Tool)
Use dbt hooks:
# dbt_project.yml
on-run-end:
- "{{ run_query('SELECT 1') }}"
Or wrap dbt commands:
#!/bin/bash
dbt run --target prod
if [ $? -eq 0 ]; then
curl https://wizstatus.com/ping/dbt-run
fi
Luigi
Use event handlers:
import luigi
import requests
@luigi.Task.event_handler(luigi.Event.SUCCESS)
def on_success(task):
if isinstance(task, FinalTask):
requests.get("https://wizstatus.com/ping/TOKEN")
@luigi.Task.event_handler(luigi.Event.FAILURE)
def on_failure(task, exception):
# Failure will be caught by missing ping
pass
Prefect
Use state handlers:
from prefect import flow
import requests
@flow(on_completion=[notify_complete])
def etl_pipeline():
extract()
transform()
load()
def notify_complete(flow, flow_run, state):
if state.is_completed():
requests.get("https://wizstatus.com/ping/TOKEN")
Dagster
Use sensors and hooks:
from dagster import success_hook
import requests
@success_hook
def heartbeat_hook(context):
requests.get("https://wizstatus.com/ping/TOKEN")
@job(hooks={heartbeat_hook})
def etl_job():
load(transform(extract()))
Pipeline Quality Checks
Beyond completion, verify data quality:
Row Count Validation
def validate_and_ping():
count = db.execute("SELECT COUNT(*) FROM target_table")[0][0]
if count < MINIMUM_ROWS:
alert(f"Only {count} rows in target")
return
requests.get(PING_URL)
Freshness Check
def validate_freshness():
latest = db.execute("""
SELECT MAX(updated_at) FROM target_table
""")[0][0]
if latest < datetime.now() - timedelta(hours=1):
alert("Data is stale")
return
requests.get(PING_URL)
Schema Validation
def validate_schema():
expected_columns = {'id', 'name', 'value', 'timestamp'}
actual_columns = set(get_table_columns('target'))
if expected_columns != actual_columns:
alert(f"Schema mismatch: {expected_columns - actual_columns}")
return
requests.get(PING_URL)
Monitoring Architecture
Simple Pipeline
Extract → Transform → Load → Ping
One monitor, one ping at the end.
Multi-Stage Pipeline
Extract → Ping/extract
↓
Transform → Ping/transform
↓
Load → Ping/load
↓
Validate → Ping/complete
Multiple monitors, one per stage.
Parallel Pipeline
Source A ───Extract─→ Transform ┐
├─→ Load → Ping
Source B ───Extract─→ Transform ┘
Ping only when all branches complete.
Setting Up Monitors
Daily Pipeline at 2 AM
- Schedule: Daily at 02:00
- Grace period: 2-3 hours (depending on data volume)
- Alerts: Email + Slack
Hourly Incremental
- Schedule: Every hour
- Grace period: 30 minutes
- Alerts: Slack only (avoid alert fatigue)
Real-Time Streaming
For streaming pipelines, ping periodically:
- Schedule: Every 5 minutes
- Grace period: 10 minutes
- Alerts: PagerDuty (critical)
Error Handling
Partial Success
def run_with_partial_success():
results = []
for source in sources:
try:
results.append(process(source))
except Exception as e:
log_error(source, e)
if len(results) > len(sources) * 0.9: # 90% success
requests.get(PING_URL)
else:
# Too many failures - don't ping
alert(f"Only {len(results)}/{len(sources)} succeeded")
Retry Logic
from tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(3))
def run_pipeline():
extract()
transform()
load()
requests.get(PING_URL, timeout=30)
Dashboard Integration
Send metrics with your ping:
def run_pipeline():
start = time.time()
records = extract()
transform(records)
load(records)
duration = time.time() - start
requests.get(PING_URL, params={
'records': len(records),
'duration': int(duration),
'source': 'production'
})
Best Practices
- Monitor end-to-end - Don't just ping on extract
- Include validation - Ping only after quality checks
- Track duration - Alert on unexpected slowness
- Monitor record counts - Empty pipelines shouldn't ping
- Separate environments - Different monitors for prod/staging
- Document dependencies - Know what relies on each pipeline
Checklist
- All critical pipelines have monitors
- Grace periods account for maximum run time
- Quality validation before pinging
- Record count thresholds defined
- Duration thresholds defined
- Staging and production separated
- Alert channels configured appropriately
- Pipeline documentation updated