
Apache Airflow est un système de gestion de flux de travail ETL (Extraction, Transformation, Chargement) beaucoup utilisé pour la transformation de données. Il est codé en python et les workflow sont écrits via des scripts pythons.
L’utilisation de python permet au développeur de créer facilement des workflow grâce à l’importation des classes et des bibliothèques python.
Les workflow sont organisés et exprimés sous forme de Directed Acyclic Graphs (DAG). En réalité, il s’agit de la méthode choisie pour réaliser une tâche lire plus.
# Installer Apache Airflow
Avant de passer à l’étape suivante rassurer vous d’installer python sur votre machine. Si python n’est pas installer veuillez vous référé à cet article sur comment installer python ici.
#Etape 1 :
export AIRFLOW_HOME=~/airflow
#Etape 2
# Installez apache airflow grâce à la commande suivante
pip3 install apache-airflow
#Etape 3
# Pour exécuter les workflows et les maintenir, airflow a besoin d’un backend de base de données. Tapez la commande suivante :
airflow db init
Si vous obtenez un "Initialization done", félicitations vous avez réussi à installer Apache airflow. Si vous avez rencontrer des problèmes durant l'installation veuillez vous référer aux articles suivants: ceci ou ceci ou à la vidéo youtube suivante.
# Lancer apache airflow
#Etape 4
Lancez le serveur web grâce à la commande ci-après. Le port par défaut est 8080. Modifiez-le s’il est déjà utilisé. Bien avant de lancer le serveur il faut ajouter le port utilisé aux ports autorisé au niveau du pare-feu. Lancer les commandes suivantes les unes après les autres.
#8080 represente le port par defaut, le port qu'on utilise dans ce tutoriel.
sudo systemctl start firewalld
sudo systemctl enable firewalld
sudo firewall-cmd --add-port=8080/tcp --permanent
sudo firewall-cmd --reload
airflow webserver -p 8080
Maintenant, créez un dossier “dags” dans le répertoire airflow où vous définirez vos workflows ou vos DAG. Ouvrez le navigateur Web et tapez : http://localhost:8080/.
Vous verrez une affichage comme ceci:

Si vous utilisez docker le username et le mot de passe sont : airflow && airflow
SI vous êtes en local vous devez creér un utulisateur administrateur comme suit:
airflow users create --role Admin --username admin --email admin@gmail.com --firstname peter --lastname parker --password admin
Redémarrer votre serveur web et connecter vous avez vos identifiants. Vous verrez une affichage comme suit:

# Créer votre premier DAG
Créer un dossiers nommer “dags” puis un fichier “my_dag.py” à l’interieur.
mkdir dags # Creer un dossier
cd dags # Entrer dans le dossier
touch my_first_dag.py # Creer un fichier
vi my_first_dag.py # Editer le fichier
Tapez sur la lettre i pour commencer à modifier le fichier.
Copiez coller dans votre fichier le code suivant :
# Importation des bibliothèques
Vous pouvez créer le même workflow à l’aide d’Apache Airflow. Mais le code sera complètement en python pour définir un DAG. Pour cela, commencez par importer les bibliothèques nécessaires. Dans ce cas, c’est la bibliothèque BashOperator qui sera utilisée, car notre flux de travail nécessite uniquement l’exécution des opérations Bash.
from airflow import DAG
from datetime import datetime
from random import randint
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
# Définition du DAG et ses arguments
Pour créer un dag vous devez créer un objet en lui passant différent paramètre a savoir le dag_id et des paramètres propres a ce que vous comptez faire. Veuillez vous référer a cet article pour une autre méthode de définition de dag https://www.data-transitionnumerique.com/apache-airflow/#h-d-finition-du-dag
with DAG (
"my_first_dag",
start_date=datetime(2022, 1, 1),
schedule_interval='@hourly',
catchup=False
) as dag:
# Définition des tâches
Pour cet airflow il faut deux tâches :
print_me: dans la première tâche, il faut imprimer « Je suis Axe-Tag l’assistant virtuel de Axe-tag.com depuis plus de : » sur le terminal à l’aide de la commande echo.
nbr_year : dans celle-ci, il faut imprimer le nombre d’anée qu’a fait Axe-Tag en soustrayant son année de création 2019 a aujourdhui.
Maintenant, tout en définissant la tâche, vous devez d’abord choisir le bon opérateur. Ici, les deux commandes sont basées sur une opération Python. C’est le PythonOperator qui sera utilisé. Le task_id sera un identifiant unique de la tâche visible sur les nœuds de Graph View de votre DAG. Passez la commande Python que vous souhaitez exécuter et enfin l’objet DAG auquel vous souhaitez lier cette tâche.
Enfin, créez le pipeline en ajoutant l’opérateur « >> » entre les tâches :
#Fonction appeler par la première tâche
def _who_am_i():
string = "Je suis Axe-Tag l'assistant virtuel de Axe-tag.com depuis plus de :"
return string
#Fonction appeler par la deuxième tâche
def _nbr_year():
current_date = datetime.today()
return current_date.year - 2000
#Premiere tâche
t1 = PythonOperator(
task_id="t1",
python_callable=_who_am_i
)
Deuxieme tâche
t2 = PythonOperator(
task_id="t2",
python_callable=_nbr_year
)
#Liaison des tâches. Il s'agit ici de définir l'ordre d'exécution des taches
t1 >> t2
Ci-dessous tout le code.
from airflow import DAG
from datetime import datetime
from random import randint
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
with DAG (
"my_first_dag",
start_date=datetime(2022, 1, 1),
schedule_interval='@hourly',
catchup=False
) as dag:
def _who_am_i():
string = "Je suis Axe-Tag l'assistant virtuel de Axe-tag.com depuis plus de :"
return string
def _nbr_year():
current_date = datetime.today()
return current_date.year - 2000
t1 = PythonOperator(
task_id="t1",
python_callable=_who_am_i
)
t2 = PythonOperator(
task_id="t2",
python_callable=_nbr_year
)
t1 >> t2
# Mettre à jour le DAGS dans l’interface utilisateur Web
Maintenant, il faut actualiser l’interface utilisateur et vous verrez votre DAG dans la liste. Activez la bascule à gauche de chacun des DAG, puis déclenchez le DAG.


Activer le DAG.


2 Comments
Seth Houndete · May 22, 2022 at 7:09 pm
Documentation très explicite , merci
mouhamed · June 6, 2022 at 9:03 pm
Merci pour cette documentation