Apache Airflow est devenu le standard de facto pour l’orchestration de pipelines de donnees. Mais sa flexibilite peut devenir un probleme.
Les concepts fondamentaux
DAG : le pipeline, defini en Python. Chaque noeud est une tache. Task : une unite de travail. Un operateur Python, un script Bash, une requete SQL… Operator : le type de tache (PythonOperator, BashOperator, PostgresOperator…)
Un DAG bien structure
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import timedelta
@dag(
dag_id='etl_orders',
schedule_interval='0 2 * * *',
start_date=days_ago(1),
catchup=False,
default_args={
'retries': 2,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=1),
},
tags=['etl', 'orders'],
)
def etl_orders():
@task
def extract_orders() -> list:
from my_db import fetch_new_orders
return fetch_new_orders()
@task
def transform(orders: list) -> list:
return [enrich_order(o) for o in orders]
@task
def load(orders: list):
from my_warehouse import bulk_insert
bulk_insert('orders_fact', orders)
orders = extract_orders()
transformed = transform(orders)
load(transformed)
dag_instance = etl_orders()
Les erreurs classiques
catchup=True par defaut : si vous deployez un DAG avec une start_date ancienne, Airflow va essayer d’executer tous les runs manques. Toujours mettre catchup=False sauf besoin explicite.
XCom pour de grandes donnees : XCom stocke les donnees dans la DB Airflow. Passer un DataFrame de 500 Mo via XCom va crasher votre metadata DB. Utilisez S3/GCS comme intermediaire.
Logique metier dans les DAGs : le DAG definit l’orchestration, pas la logique. Importez vos fonctions depuis des modules Python.
Tester ses DAGs
from airflow.models import DagBag
def test_dag_loads():
dagbag = DagBag()
assert 'etl_orders' in dagbag.dags
assert dagbag.import_errors == {}
Notre formation Python pour l’automatisation inclut un module sur l’orchestration avec Airflow.