Comment développer un job de traitement de données à l’aide d’Apache Beam

Comment développer un job de traitement de données à l’aide d’Apache Beam

  • Alexey Romanenko
    Alexey Romanenko is Open Source Engineer in Talend (France) with more than 15 years of experience in software development. During his career, he has been working on different projects, like high-load web services, web search engines and cloud storage. Also, he developed and presented a course devoted to Hadoop/Cloud technologies for students. Recently, he joined the Apache Beam project as a new contributor. He spends his spare time with his family and he likes to play ice hockey.

Ce blog est la première partie d’une série d'articles de blog consacrés à Apache Beam.

Connaissez-vous Apache Beam ? Si ce n’est pas le cas, n’ayez pas honte : c’est l’un des derniers projets développés par l’Apache Software Foundation, dont la première version est sortie en 2016, donc relativement récente dans le monde du traitement de données. En fait, cela ne fait pas très longtemps que je travaille étroitement avec Apache Beam, que j’ai adoré apprendre à utiliser, en appréciant tout ses avantages.

Apache Beam est un modèle de programmation unifié qui propose un cadre simple pour implémenter les jobs de traitement de données par lots et en streaming, puis de les exécuter sur tous les moteurs en utilisant un ensemble d’E/S diversifié. Prometteur, mais un peu confus ? C’est la raison pour laquelle j’ai décidé de consacrer une série de blogs à Apache Beam. Dans cette publication, ainsi que dans les suivantes, je vous montrerai des exemples concrets et mettrai en évidence plusieurs cas d’usage de jobs de traitement de données utilisant Apache Beam.

Le sujet du jour est consacré au traitement par lots. Prenons l’exemple suivant : Vous travaillez pour un concessionnaire automobile et voulez analyser les ventes de voitures sur une période donnée (par exemple, combien de voitures de chaque marque ont été vendues ?). Cela signifie que notre dataset est délimité (volume de données précis) et qu’il ne sera pas modifié (ventes passées). Dans ce cas, nous pouvons nous reposer sur un processus par lots pour analyser nos données.

Comme données en entrée, nous disposons de journaux texte des voitures vendues au format suivant:

id,brand_name,model_name,sales_number

Par exemple :
1,Toyota,Prius,3
2,Nissan,Sentra,2
3,Ford,Fusion,4

Avant de commencer l’implémentation de notre première application Beam, nous devons prendre en compte certaines idées fondamentales qui seront utilisées plus tard. Il existe trois concepts principaux dans Beam : Pipeline, PCollection et PTransform.

  • Pipeline englobe le workflow de toutes les tâches de traitement de données, du début à la fin.
  • PCollection est une abstraction de datasets répartis utilisés par Beam pour transférer les données entre PTransforms.
  • PTransform est un processus qui fonctionne avec des données d’entrée (PCollection) et produit des données en sortie (PCollection). Généralement, les premiers et derniers PTransformssont des techniques d’entrée et de sortie des données qui peuvent être délimitées (traitement par lots) ou non délimitées (traitement en streaming).

Pour simplifier, nous pouvons considérer Pipeline comme un graphe orienté acyclique (DAG, Directed Acyclic Graph), qui représente votre workflow entier, les PTransforms comme des nœuds (qui transforment les données) et les PCollections comme les limites de ce graphique. Pour en savoir plus, lisez le Guide de programmation Beam.

Revenons à présent à notre exemple et essayons d’implémenter le premier pipeline, qui va traiter le dataset fourni.

Créer un pipeline

Commençons par créer un nouveau pipeline :

Pipeline pipeline = Pipeline.create();

Créez ensuite un nouveau PTransform à l’aide de la méthode pipeline.apply() , qui lira les données à partir du fichier texte et créera une nouvelle PCollection de chaînes. Pour ce faire, nous utilisons une des E/S déjà implémentées dans Beam : TextIO. TextIO nous permet de lire et d’écrire dans les fichiers texte, ligne par ligne. Elle possède de nombreuses autres fonctionnalités, comme travailler avec différents systèmes de fichiers, prendre en charge des schémas de fichiers et le streaming de fichiers. Pour en savoir plus, lisez la documentation d’Apache Beam.

apply(TextIO.read().from(/path/to/input/file))

La sortie de ce PTransform est une nouvelle instance de PCollection<Chaîne> où chaque entrée de la collection est une ligne de texte du fichier d’entrée.

Comme nous voulons obtenir le nombre total des ventes par marque, nous devons les regrouper. Par conséquent, la prochaine étape consiste à analyser chaque ligne et à créer une paire clé/valeur, où la clé sera un nom de marque et la valeur le nombre de ventes. Il est utile de mentionner que la PCollection en sortie d’un PTransform précédent sera l’entrée de la PCollection de celui-ci.

Pour cette étape, nous utilisons un PTransform interne à Beam, appelé MapElements, pour créer une nouvelle paire de clé/valeurs pour chaque entrée, en utilisant l’implémentation fournie de l’interface SimpleFunction.

Nous regroupons ensuite le nombre de ventes par marque à l’aide d’une autre transformation de Beam : GroupByKey. Comme résultat de sortie nous avons une PCollection de clé/valeurs, où la clé est le nom de la marque et les valeurs une collection itérable de ventes pour cette marque.

.apply(GroupByKey.<String, Integer>create())

 

Nous sommes maintenant prêts à additionner le nombre de ventes de voitures par marque à l’aide de notre propre implémentation de la transformation ParDo :

Pour finaliser le pipeline, nous appliquons une autre transformation d’E/S pour prendre la PCollection de chaînes et les écrire dans un fichier texte :

.apply(TextIO.write().to(/path/to/output/dir).withoutSharding());

En dernier lieu, nous allons exécuter notre pipeline ainsi créé :

pipeline.run();

Ça paraît facile, non ? C’est toute la puissance d’Apache Beam, qui nous permet de créer des pipelines de traitement de données complexes avec un minimum de code.

Ceux qui connaissent bien Hadoop ont pu remarquer que ce pipeline ressemble à ce qui suit :

  • Il lit et analyse les données texte ligne par ligne et crée de nouvelles paires clé/valeurs (Map)
  • Il regroupe ensuite ces paires par clé (GroupBy)
  • Enfin, il itère toutes les valeurs d’une clé en leur appliquant une fonction utilisateur (Reduce)

Oui, vous avez bien lu, ce simple pipeline peut être exécuté avec un job MapReduce classique ! Mais, juste pour montrer la simplicité et la clarté de Beam (même si c’est du Java !) et si nous décidions d’étendre nos pipelines en ajoutant une autre transformation, il ne deviendrait pas pour autant plus compliqué.

Créer et exécuter un pipeline

Comme mentionné plus haut, un pipeline Beam peut être exécuté sur différents exécuteurs (moteurs de traitement) :

  • Direct Runner
  • Apache Apex
  • Apache Flink
  • Apache Gearpump
  • Apache Spark
  • Google Cloud Dataflow

Pour ce faire, nous avons juste besoin d’ajouter une dépendance correspondante à notre configuration de projet maven ou gradle. La bonne nouvelle est que nous n’avons pas à régler ou réécrire le code du pipeline pour qu’il s’exécute sur chaque exécuteur. Encore mieux : nul besoin de recompiler nos jars si la dépendance de tous les exécuteurs requis a été incluse ; il nous suffit de choisir un exécuteur, et c’est tout !

Direct Runner est un exécuteur local en général utilisé pour tester notre pipeline. Avec Java, vous devez spécifier votre dépendance sur Direct Runner dans votre pom.xml.


<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-runners-direct-java</artifactId>
   <version>2.3.0</version>
   <scope>runtime</scope>
</dependency>


Ensuite, vous devez compiler votre projet :
# mvn clean package

Exécutez ensuite votre pipeline sur Direct Runner :
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.SalesPerCarsBrand -Pdirect-runner -Dexec.args="--runner=DirectRunner”

Par exemple, si notre fichier d’entrée contient les données suivantes :
# cat /tmp/beam/cars_sales_log
1,Toyota,Prius,3
2,Nissan,Sentra,2
1,Toyota,Yaris,4
3,Ford,Fusion,5
3,Ford,Kuga,3

Le résultat final ressemblerait à ceci :
# cat /tmp/beam/cars_sales_report
Toyota: 7
Nissan: 2
Ford: 8

La liste de tous les exécuteurs pris en charge et les instructions d’utilisation se trouvent sur cette page.

Enfin, tout le code de cet exemple est disponible dans ce référentiel GitHub : https://github.com/aromanenko-dev/beam-tutorial.

Dans la prochaine partie de cette série d'articles de blogs, je parlerai du traitement de données en streaming dans Beam. Je prendrai un autre exemple de tâches d’analyse de données avec une source de données non limitée et nous verrons ce que Beam nous propose dans ce cas.

Participer aux discussions

0 Comments

Laisser un commentaire

Your email address will not be published. Required fields are marked *