Les pipelines ETL (Extract, Transform, Load) sont critiques pour les organisations data-driven. Quand ils échouent silencieusement, les rapports, dashboards et décisions en aval en souffrent. Voici comment implémenter un monitoring complet pour vos pipelines de données.
Le Défi du Monitoring ETL
Les échecs ETL sont particulièrement insidieux car :
- Les données semblent normales - Les anciennes données s'affichent encore dans les rapports
- Pas d'erreurs immédiates - Les utilisateurs ne voient pas de pages d'erreur
- Découverte tardive - Souvent trouvé des jours plus tard
- Impact en cascade - Analytics, modèles ML, rapports tous affectés
- Dépendances complexes - Plusieurs étapes peuvent échouer indépendamment
Quoi Monitorer dans l'ETL
Completion du Pipeline
Le pipeline entier s'est-il terminé ?
def executer_pipeline():
extract()
transform()
load()
# Signaler la completion
requests.get(PING_URL)
Completion par Étape
Monitorez chaque étape indépendamment :
def executer_pipeline():
extract()
requests.get(f"{PING_URL}/extract")
transform()
requests.get(f"{PING_URL}/transform")
load()
requests.get(f"{PING_URL}/load")
Nombre d'Enregistrements
Alertez si moins d'enregistrements que prévu :
def executer_pipeline():
records = extract()
if len(records) < MINIMUM_ATTENDU:
# Ne pas pinguer - données suspectes
alert(f"Seulement {len(records)} enregistrements extraits")
return
transform(records)
load(records)
requests.get(PING_URL)
Durée du Pipeline
Alertez si trop long :
import time
def executer_pipeline():
start = time.time()
extract()
transform()
load()
duration = time.time() - start
requests.get(f"{PING_URL}?duration={duration}")
if duration > DUREE_MAX:
alert(f"Pipeline a pris {duration}s")
Monitoring de Différents Outils ETL
Apache Airflow
Utilisez les callbacks Airflow :
from airflow import DAG
from airflow.operators.python import PythonOperator
import requests
def notifier_succes(context):
requests.get("https://wizstatus.com/ping/TOKEN")
def notifier_echec(context):
requests.get("https://wizstatus.com/ping/TOKEN/fail")
dag = DAG(
'etl_quotidien',
schedule_interval='0 2 * * *',
on_success_callback=notifier_succes,
on_failure_callback=notifier_echec
)
dbt (Data Build Tool)
Encapsulez les commandes dbt :
#!/bin/bash
dbt run --target prod
if [ $? -eq 0 ]; then
curl https://wizstatus.com/ping/dbt-run
fi
Prefect
Utilisez les handlers d'état :
from prefect import flow
import requests
@flow(on_completion=[notifier_complete])
def pipeline_etl():
extract()
transform()
load()
def notifier_complete(flow, flow_run, state):
if state.is_completed():
requests.get("https://wizstatus.com/ping/TOKEN")
Checks de Qualité du Pipeline
Au-delà de la completion, vérifiez la qualité des données :
Validation du Nombre de Lignes
def valider_et_pinguer():
count = db.execute("SELECT COUNT(*) FROM table_cible")[0][0]
if count < LIGNES_MINIMUM:
alert(f"Seulement {count} lignes dans la cible")
return
requests.get(PING_URL)
Check de Fraîcheur
def valider_fraicheur():
latest = db.execute("""
SELECT MAX(updated_at) FROM table_cible
""")[0][0]
if latest < datetime.now() - timedelta(hours=1):
alert("Données obsolètes")
return
requests.get(PING_URL)
Bonnes Pratiques
- Monitorez de bout en bout - Ne pingez pas seulement sur l'extraction
- Incluez la validation - Pingez seulement après les checks de qualité
- Suivez la durée - Alertez sur les lenteurs inattendues
- Monitorez les compteurs - Les pipelines vides ne devraient pas pinguer
- Séparez les environnements - Moniteurs différents pour prod/staging
- Documentez les dépendances - Sachez ce qui dépend de chaque pipeline
Checklist
- Tous les pipelines critiques ont des moniteurs
- Les périodes de grâce prennent en compte la durée max d'exécution
- Validation de qualité avant le ping
- Seuils de nombre d'enregistrements définis
- Seuils de durée définis
- Staging et production séparés
- Canaux d'alerte configurés de façon appropriée
- Documentation du pipeline mise à jour