Apache Spark et Talend : performances et mise au point

Apache Spark et Talend : performances et mise au point

  • Petros Nomikos
    I have 3 years of experience with installation, configuration, and troubleshooting of Big Data platforms such as Cloudera, MapR, and HortonWorks. I joined Talend in 2014, and prior to Talend I held positions as manager of technical support, and project manager for data warehouse implementations. In my current role, I assist companies in understanding how to implement Talend in their Big Data Ecosystem.

Je voudrais commencer par remercier tous les lecteurs de mes deux précédents articles traitant de Talend et Apache Spark.

Si vous venez de rejoindre cette série d'articles, vous pouvez lire les articles précédents ici : Talend and Apache Spark: A Technical Primer, et la 2e ici : Talend vs. Spark Submit Configuration: What’s the Difference?.

Les deux premières publications de ma série portant sur Apache Spark incluent une vue d’ensemble du fonctionnement de Talend avec Spark, les similitudes entre Talend et Spark Submit, et les options de configuration disponibles pour les jobs Spark dans Talend.

Dans cet article, nous allons nous pencher sur les performances et la mise au point d’Apache Spark. C’est un sujet couramment débattu par tous ceux qui utilisent Apache Spark, même sans Talend. Lorsque vous développez et exécutez vos premiers jobs Spark, vous vous posez probablement les questions suivantes :

  • Combien d’exécuteurs dois-je allouer à mon job Spark ?
  • De quel volume de mémoire a besoin chaque exécuteur ?
  • Combien de cœurs dois-je utiliser ?
  • Pourquoi certains jobs Spark prennent des heures pour traiter 10 Go de données et comment résoudre ce problème ?

Dans ce blog, je vais traiter toutes ces questions et fournir des réponses et des informations exploitables. Avant d’aller plus loin, je vais présenter quelques concepts fondamentaux qui seront utilisés dans cet article :

Partition : Une partition est une partie d’un dataset distribué. Elle est créée par la taille de bloc HDFS par défaut. Spark utilise les partitions pour effectuer le traitement parallèle de datasets.

Tâches : Les tâches sont des unités de travail pouvant être exécutées au sein d’un exécuteur.

Cœur : Un cœur est l’unité de traitement d’un processeur qui détermine le nombre de tâches parallèles de Spark pouvant être exécutées au sein d’un exécuteur.

Exécuteur : Processus démarré sur des nœuds de travail et qui exécute votre soumission de job en mémoire ou sur disque.

Référence d’application : Chaque application YARN lance un processus principal d’application qui a la responsabilité de demander des ressources au gestionnaire de ressources. Une fois les ressources allouées, le processus collabore avec les gestionnaires de nœuds pour lancer les conteneurs requis en leur sein.

Mise au point Spark

Pour commencer, voyons comment vous pouvez mettre au point vos jobs Apache Spark dans Talend. Comme indiqué plus haut, dans votre job Talend Spark, vous trouverez l’onglet Spark Configuration (Mise au point Spark) dans lequel vous pouvez définir les propriétés de mise au point. Cette option est toujours désactivée par défaut dans Talend.

Dans cette section, vous avez l’option de définir la mémoire et les cœurs que votre référence d’application et vos exécuteurs utiliseront, et combien d’exécuteurs seront requis par votre job. La principale question qui se pose lorsque vous commencez à renseigner les valeurs de cette section est la suivante : « Comment déterminer le nombre de cœurs ou la mémoire requis par ma référence d’application ou mes exécuteurs pour atteindre de bonnes performances ? » Réfléchissons à cette question.

Comment choisir le nombre de cœurs pour votre job Spark

À ce stade, certains facteurs doivent être pris en compte avant de continuer. Les voici :

  1. La taille de vos datasets
  2. Le délai d’achèvement de votre job
  3. Les opérations et actions effectuées par votre job

Avec ces facteurs à l’esprit, nous pouvons commencer à configurer notre job pour en optimiser les performances. Commençons par mettre au point notre référence d’application. Nous pouvons utiliser les valeurs par défaut, car la référence ne fait qu’orchestrer les ressources sans effectuer de traitement, ce qui signifie qu’elle n'a pas besoin de valeurs élevées en termes de mémoire ou de cœurs.

Notre prochaine étape consiste à configurer la mémoire et les cœurs de nos exécuteurs. La principale question porte sur le nombre d’exécuteurs et de cœurs, ainsi que le volume de mémoire, à utiliser. Pour trouver la réponse, imaginons que nous avons un cluster Hadoop avec 6 nœuds de travail, chacun ayant 32 cœurs et 120 Go de mémoire. À première vue, plus nous avons de tâches concurrentes par exécuteur, meilleures seront les performances. En faisant des recherches, par exemple dans les guides de mise au point de Hadoop comme Cloudera (ce lien), il a été démontré qu’avec plus de 5 cœurs par exécuteur, les performances d’E/S HDFS sont mauvaises. Par conséquent, 5 cœurs est la valeur maximale pour les performances.

Ensuite, voyons combien d’exécuteurs nous voulons lancer. En fonction du nombre de cœurs et de nœuds, nous pouvons facilement déterminer ce nombre. Comme indiqué, il convient de ne pas dépasser 5 cœurs par exécuteur. Des 32 cœurs que nous avons par nœud, nous devons retirer ceux que nous ne pouvons pas utiliser pour nos jobs, car ils seront utilisés par le système d’exploitation et les dameons Hadoop s’exécutant sur le nœud. L’outil de gestion du cluster Hadoop s’occupe de cette allocation des cœurs, ce qui nous facilite la tâche de savoir combien de nœuds sont disponibles pour nos jobs Spark.

Après avoir effectué le calcul, supposons qu’il nous reste 30 cœurs disponibles par noeud. Comme nous avons déjà déterminé que 5 cœurs est le nombre maximal par exécuteur, nous savons que nous pouvons dédier jusqu’à 6 exécuteurs par nœud. Facile !

Enfin, terminons en calculant le volume de mémoire utilisable. En fonction des spécifications matérielles ci-dessus, nous voyons que chaque nœud dispose de 120 Go de mémoire, mais comme je l’ai déjà expliqué plus haut, nous ne pouvons pas utiliser toute cette mémoire pour les jobs, car le système d’exploitation en a aussi besoin. Ici aussi, l'outil de gestion du cluster Hadoop peut déterminer le volume de mémoire qui peut être dédié à nos jobs. Si le système d’exploitation et les daemons Hadoop nécessitent 2 Go de mémoire, il nous reste donc 118 Go de mémoire pour nos jobs Spark. Comme nous avons déjà déterminé que nous pouvons avoir 6 exécuteurs par nœud, le calcul nous montre que nous pouvons dédier environ 20 Go par exécuteur. Cela n’est cependant pas vrai à 100 %, car nous devons aussi prendre en compte la mémoire utilisée par chaque exécuteur. Dans mon précédent article, j’ai mentionné que cette mémoire s’élevait par défaut à 384 Mo. Si je retire ce volume de mémoire des 20 Go, je peux alors dédier un maximum de 19 Go à chaque exécuteur.

Allocation dynamique ou fixe des ressources cluster

Les chiffres ci-dessus s’appliquent aux deux types d’allocation (fixe et dynamique) des ressources cluster dans un job Spark. La différence entre les deux est l’allocation dynamique. Avec une allocation dynamique, vous pouvez spécifier le nombre initial d’exécuteurs utilisés, un minimum d’exécuteurs que peut utiliser le job quand la charge n’est pas élevée, et un nombre maximum lorsque plus de puissance de traitement est nécessaire. Bien qu’il serait intéressant de pouvoir utiliser toutes les ressources du cluster pour notre job, nous devons partager cette puissance de traitement avec les autres jobs qui s’exécutent sur le cluster. Par conséquent, en fonction de ce que nous avons identifié dans nos exigences lorsque nous avons pris en compte les facteurs définis plus haut pour la mise au point de notre job Talend Spark, cela déterminera le pourcentage de ces valeurs maximales que nous pouvons effectivement utiliser.

Notre job étant configuré, nous pouvons maintenant l’exécuter ! Supposons que notre job Spark prend quand même beaucoup de temps, même configuré avec les paramètres optimaux définis ci-dessus. Nous devons revenir à notre configuration et examiner quelques autres paramètres pour nous assurer qu’ils sont configurés pour des performances optimales.

Performances Spark

Tout d’abord, supposons que nous joignons deux tables dans notre job Spark. L’un des facteurs pris en compte avant de commencer à optimiser nos jobs Spark était la taille de nos jeux de données. Lorsque nous nous penchons sur la taille des tables et que nous déterminons que l’une d’entre elles fait 50 Go et l’autre 100Mo, nous devons voir si nous optimisons au mieux les composants Talend des jointures répliquées.

Jointure répliquée

Une jointure répliquée (appelée également jointure « Map-Side ») est fréquemment utilisée lorsque vous joignez une grande table avec une table plus petite pour diffuser les données de la plus petite des tables à tous les exécuteurs. Dans ce cas, comme le jeu de données plus petit peut tenir dans la mémoire, nous pouvons utiliser une jointure répliquée pour la diffuser à chaque exécuteur et optimiser ainsi les performances de notre job Spark.

Comme les données de la table doivent être combinées au niveau de l’exécuteur avec les données secondaires, en envoyant le jeu de données réduit à tous les exécuteurs, nous évitons que les données de la grande table soient transmises sur le réseau. Très souvent, les problèmes de performances affectant Spark sont dus au transfert d’importants volumes de données sur le réseau. Cela est facilement vérifiable au niveau du travail Talend, en activant l’option « Use replicated join » (Utiliser la jointure répliquée) dans le composant tMap, comme indiqué ci-dessous. Les données de la table de recherche seront envoyées à tous les exécuteurs.

 

La prochaine étape consiste à voir si notre job inclut des opérations qui effectuent des calculs gourmands.

Cache Spark

Afin de suivre les recalculs, prenons comme simple exemple le chargement d’un fichier contenant des données d’achat de clients. À partir de ces données, nous voulons capturer quelques indicateurs :

  • le nombre total de clients,
  • le nombre de produits achetés.

Dans ce cas, si nous n’utilisons pas de cache Spark, chaque opération ci-dessus chargera les données. Cela affectera nos performances, car un lourd recalcul se produira. Comme nous savons que ce jeu de données devra être utilisé plus tard par le job, il est conseillé d’utiliser le cache Spark pour le placer en mémoire pour un usage ultérieur pour nous éviter de la charger à nouveau.

Dans le cadre de nos jobs Talend Spark, cela s’effectue avec les composants tCacheIn et tCacheOut, disponibles dans la palette Apache Spark dans Talend, vous permettant d’utiliser le mécanisme de mise en cache de Spark, avec les différentes options disponibles.

Vous pouvez également choisir de mettre les données en cache uniquement sur disque, et vous avez l’option de sérialiser les données en cache en mémoire, sur disque, ou les deux. Enfin, vous pouvez aussi choisir de répliquer les données mises en cache sur deux autres nœuds. L'option la plus fréquemment utilisée est la mémoire, sans sérialisation, car c’est la plus rapide, mais sachant que le RDD mis en cache ne peut pas tenir dans la mémoire et que nous ne voulons pas qu’il déborde sur le disque, la sérialisation est préférable, car elle réduit l’espace consommé par le jeu de données ; cela se fait cependant au détriment des performances en raison de la surcharge occasionnée. Par conséquent, vous devez évaluer vos options et choisir celle la mieux adaptée à vos besoins.

 

Si vous rencontrez encore des problèmes de performances, il convient de se tourner vers l’interface Web de l’historique Spark pour comprendre ce qu’il se passe. Comme mentionné dans mon article précédent, vous pouvez activer la journalisation Spark dans la section Spark History de Spark Configuration dans Talend. La journalisation Spark vous aide à rechercher et résoudre les problèmes des jobs Spark en conservant les journaux après la fin des jobs et en les rendant disponibles dans l’interface Web Spark History. Avec la journalisation des événements Spark activée, vous nous permettrez de résoudre plus facilement les problèmes de performances.

 

Vous pouvez consulter l’interface Web Spark History, qui comporte plusieurs onglets concernant le numéro d’application pour notre job :

Dans l’interface utilisateur de Spark ci-dessus, nous allons regarder l’onglet Stages, identifier l’étape qui affecte les performances de notre job, accéder à ses détails et vérifier si nous voyons quelque chose qui ressemble au comportement suivant :

 

Nous constatons qu’une seule étape traite la plupart des données et que les autres sont inactives, même après avoir alloué 10 exécuteurs. Pourquoi ce phénomène ? Pour répondre à cette question, nous devons identifier l’étape du travail ou se produit le problème. Comme exemple, nous notons que cela se produit dans la partie du job Spark où nous lisons les données depuis un fichier compressé. Étant donné que les fichiers d’archive ne sont pas partitionnés par défaut à la lecture, un RDD avec une seule partition va être créé pour chaque fichier d'archive que nous lisons, ce qui provoquera ce comportement. Si ce fichier compressé est un fichier d’archive qui peut être divisé comme BZIP2, et partitionné à la lecture, dans les paramètres avancés (Advanced settings) de tFileInputDelimited, nous pouvons activer la propriété « Set Minimum Partitions » (Définir les partitions minimales), puis au minimum définir autant de partitions qu’il y a d’exécuteurs comme point de départ.

Mais, dans le cas d’un fichier d’archive comme GZIP qui ne peut pas être repartitionné à la lecture, nous pouvons le partitionner explicitement à l’aide de notre composant tPartition. Ce composant, comme montré ci-dessous, nous permet de repartitionner le fichier pour que puissions répartir la charge de manière égale entre les exécuteurs.

Le partitionnement à la lecture peut également servir lors de la lecture d’une base de données avec nos composants tJDBC, avec les propriétés suivantes :

Le repartitionnement ne peut être appliqué que dans certaines situations, comme illustré ci-dessus. Si nous déterminons que notre jeu de données est asymétrique sur les clés que nous utilisons pour la jointure, d’autres méthodes doivent être envisagées. Alors, comment identifier l’asymétrie des données ? Commencez par regarder le jeu de données par partition et voyez comment les données sont regroupées parmi nos clés que nous utilisons pour la jointure. Voici un exemple de jeu de données asymétrique par partition :

Dans ce cas, si nous ne pouvons pas repartitionner par une autre clé, nous devrions envisager d’autres méthodes pour améliorer notre job Spark. Une technique fréquemment utilisée est celle dite du « salting ». Avec le salting, vous ajoutez une clé fictive à votre clé réelle pour équilibrer la répartition des données par partition. Cela peut se faire par le biais de notre composant tmap dans un job Spark, par exemple comme ceci :

Comme nous pouvons le voir ci-dessus, nous ajoutons au niveau tmap la clé fictive en tant que nombre aléatoire, et nous la joignons avec notre clé réelle au jeu de données de recherche auquel nous avons déjà ajouté la clé fictive. Comme la jointure se produit sur la base de notre clé réelle, en plus de la clé fictive que nous avons générée pour la répartition, cela évitera les partitions asymétriques qui peuvent affecter nos performances lors de la jointure de jeux de données dans Spark.

Conclusion

Il existe de nombreuses techniques pour améliorer les performances et effectuer la mise au point de nos jobs Talend Spark. J’espère que celles que nous avons étudiées dans ce blog vous seront utiles. Vous pouvez maintenez créer plus de jobs Spark sur Talend !

Références :

https://spark.apache.org/docs/latest/tuning.html

https://www.cloudera.com/documentation/enterprise/latest/topics/admin_spark_tuning1.html

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-tuning.html

https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_spark-component-guide/content/ch_tuning-spark.html

Participer aux discussions

0 Comments

Laisser un commentaire

Your email address will not be published. Required fields are marked *