Comment développer un job de traitement de données à l’aide d’Apache Beam – Pipelines de streaming

Comment développer un job de traitement de données à l’aide d’Apache Beam – Pipelines de streaming

  • Alexey Romanenko
    Alexey Romanenko is Open Source Engineer in Talend (France) with more than 15 years of experience in software development. During his career, he has been working on different projects, like high-load web services, web search engines and cloud storage. Also, he developed and presented a course devoted to Hadoop/Cloud technologies for students. Recently, he joined the Apache Beam project as a new contributor. He spends his spare time with his family and he likes to play ice hockey.

Dans notre dernier article, nous avons évoqué le développement de jobs de traitement de données à l’aide d’Apache Beam. Nous abordons dans cet article l’une des questions les plus fréquentes du monde des big data modernes : le traitement des données en streaming.

La principale différence entre le traitement des données par lots et en streaming est le type de source des données entrantes. Lorsque votre dataset est limité (même s’il est très volumineux) et qu’il n’est pas mis à jour parallèlement au traitement, un pipeline par lots convient parfaitement. Dans ce cas, les données peuvent provenir de fichiers, de tables de bases de données, d’objets dans des stockages d’objets, etc. Il faut souligner que, pour le traitement par lots, les données doivent être immuables durant tout le traitement et que le nombre d’enregistrements d’entrée doit être constant. Pourquoi est-ce si important ? Car même avec des fichiers, le flux de données peut être illimité si des fichiers sont constamment ajoutés ou modifiés. En pareil cas, nous devons adopter une approche du traitement des données en streaming. Si nous savons que nos données sont limitées et immuables, nous pouvons développer un pipeline par lots.

Les choses se compliquent si notre dataset est illimité (lorsque des données arrivent en continu) ou variable. Les sources peuvent alors être des systèmes de messages (comme Apache Kafka), des nouveaux fichiers dans un dépôt (journaux de serveurs web) ou tout autre système collectant des données en temps réel (capteurs IoT par exemple). Le point commun à toutes ces sources est qu’elles nous imposent de devoir toujours attendre de nouvelles données. Bien sûr, nous pouvons diviser nos données en lots, par période ou par taille de données, et traiter chacune des parties par lots. Mais il serait alors difficile d’appliquer certaines fonctions à tous les datasets et de créer un pipeline global. Heureusement, plusieurs moteurs de streaming nous permettent de gérer ce type de traitement de données facilement : Apache Spark, Apache Flink, Apache Apex ou Google DataFlow. Ils sont tous compatibles avec Apache Beam et nous pouvons exécuter le même pipeline sur différents moteurs sans changement de code. De plus, nous pouvons utiliser le même pipeline en mode streaming ou par lots, avec très peu de changements. Il suffit de définir correctement la source d’entrée, et tout fonctionne correctement. Comme par magie ! J’en rêvais à l’époque où je devais réécrire mes jobs par lots pour les transformer en jobs de streaming.

Voilà pour la théorie, passons maintenant à la pratique avec un exemple de code de streaming. Nous allons lire des données depuis Kafka (source illimitée), effectuer certaines actions simples de traitement des données, puis écrire les résultats dans Kafka.

Supposons que nous avons un flux illimité de coordonnées géographiques (X et Y) d’objets sur une carte (disons par exemple des voitures). Ces coordonnées arrivent en temps réel, et nous voulons sélectionner uniquement les objets qui sont situés dans une zone spécifique. En d’autres termes, nous devons consommer des données textuelles d’un topic Kafka, les analyser et les filtrer selon des limites spécifiées, puis écrire dans un autre topic Kafka. Voyons comment faire tout ceci grâce à Apache Beam.

Chaque message Kafka contient des données textuelles au format suivant :
id,x,y

où :
  id = identifiant unique de l’objet,
x, y = coordonnées sur la carte (nombres entiers).

Si le format est incorrect, nous devons en tenir compte et ignorer les enregistrements correspondants.

Créer un pipeline

Nous créons un pipeline de la même manière que dans notre article précédent, où nous effectuions un traitement par lots :

Pipeline pipeline = Pipeline.create(options);

Nous pouvons développer l’objet Options pour passer des options de ligne de commande dans le pipeline. Pour en savoir plus, consultez l’exemple complet sur GitHub.

Nous devons ensuite lire les données depuis le topic d’entrée Kafka. Comme indiqué précédemment, Apache Beam offre différents connecteurs d’entrées/sorties, dont KafkaIO. Nous créons donc une nouvelle commande PTransform illimitée qui consomme les messages arrivant du topic Kafka spécifié et les propage à l’étape suivante :

pipeline.apply(
    KafkaIO.read()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getInputTopic())
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(StringDeserializer.class))

Par défaut, KafkaIO encapsule tous les messages consommés dans un objet KafkaRecord. Cependant, la transformation suivante récupère uniquement une charge utile (valeurs de chaînes) par l’objet DoFn nouvellement créé :

.apply(
    ParDo.of(
        new DoFn, String>() {
            @ProcessElement
            public void processElement(ProcessContext processContext) {
                KafkaRecord record = processContext.element();
                processContext.output(record.getKV().getValue());
            }
        }
    )
)

Il est maintenant temps de filtrer les enregistrements, comme indiqué ci-dessus. Avant cela, nous devons analyser notre valeur de chaîne selon le format défini. Cela permet de l’encapsuler dans un objet fonctionnel qui sera ensuite utilisé par la transformation interne Beam Filter.

.apply(
    "FilterValidCoords",
    Filter.by(new FilterObjectsByCoordinates(
        options.getCoordX(), options.getCoordY()))
)

Nous devons ensuite préparer les messages filtrés, pour qu’ils puissent être écrits dans Kafka. Pour cela, créons une nouvelle paire clé/valeur à l’aide de la classe Beam interne KV, qui peut être utilisée sur différents connecteurs d’entrées/sorties, dont KafkaIO.

.apply(
    "ExtractPayload",
    ParDo.of(
        new DoFn>() {
           @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
                c.output(KV.of("filtered", c.element()));
           }
        }
    )
)

La transformation finale est nécessaire pour écrire les messages dans Kafka, nous utilisons donc simplement KafkaIO.write() (implémentation de récepteur) à cet effet. En ce qui concerne la lecture, nous devons configurer cette transformation avec certaines options obligatoires, comme les serveurs d’amorçage Kafka, le nom du topic de sortie et les sérialiseurs de clé/valeur.

.apply(
    "WriteToKafka",
    KafkaIO.write()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getOutputTopic())
        .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
        .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)
);

Enfin, nous exécutons notre pipeline comme d’habitude :

pipeline.run();

Cela peut sembler un peu plus compliqué que ce qui a été décrit dans notre article précédent, mais comme vous pouvez facilement le constater, nous n’avons effectué aucune action spécifique pour que notre pipeline soit compatible avec le traitement en streaming. Cette compatibilité est assurée par l’implémentation du modèle de données Apache Beam, ce qui permet aux utilisateurs Beam de passer très facilement d’un traitement par lots à un traitement en streaming.

Créer et exécuter un pipeline

Ajoutez les dépendances nécessaires pour pouvoir utiliser KafkaIO Beam :

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-kafka</artifactId>
  <version>2.4.0</version>
</dependency>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>1.1.0</version>
</dependency>

Ensuite, créez un fichier jar et exécutez-le avec DirectRunner pour tester son fonctionnement :

# mvn clean package
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.FilterObjects -Pdirect-runner -Dexec.args="--runner=DirectRunner"

Si nécessaire, nous pouvons ajouter d’autres arguments utilisés dans le pipeline à l’aide de l’option exec.args. N’oubliez pas de vous assurer que vos serveurs Kafka sont disponibles et correctement spécifiés avant d’exécuter le pipeline Beam. Enfin, la commande Maven lancera un pipeline et l’exécutera jusqu’à ce qu’il soit arrêté manuellement (il est également possible de définir une durée d’exécution maximale). Cela signifie que les données seront traitées en continu, en mode streaming.

Comme d’habitude, tout le code de cet exemple est disponible dans ce dépôt GitHub.

Bon streaming !

Participer aux discussions

0 Comments

Laisser un commentaire

Your email address will not be published. Required fields are marked *