Optimiser son job Spark

Partager l'article

Réduire le temps d’exécution de son job Spark

Optimiser un job Spark est une discipline entre la science et l’art. La science car on peut suivre une méthodologie bien définie, l’art parce que pour quiconque a ouvert une Spark UI… on s’accordera à dire qu’on se sent un peu comme dans Guernica !

Le problème

Avant de commencer à optimiser son job Spark, il faut définir notre problème : qu’est-ce qui ne nous convient pas dans l’exécution actuelle ? Les problèmes les plus courants sont :

  • Temps d’exécution trop long
  • Trop de ressources consommées, on arrive au max de l’infra à disposition
  • Erreurs d’OOM (Out Of Memory) trop fréquents

Les solutions

Une fois qu’on a défini son problème, on peut lister les actions possibles et les prioriser. Mon conseil, commencer par le plus simple et continuer tant que l’objectif n’est pas rempli. La définition de “plus simple” est très subjective et dépend de l’expérience de la personne qui réalisera l’action. Par exemple, je n’ai encore jamais exploré la fonctionnalité de checkpoint  dans Spark. Je classerais donc cette solution par défaut plus bas que d’autres pour le côté inconnue que cela représente pour moi.

Il faut aussi cibler ses solutions par rapport à la problématique à résoudre. Ici, on parlera de temps d’exécution trop long.

1. Supprimer toutes les actions inutiles du code

Surement la solution la plus simple et la plus efficace si vous êtes concernés. Spark est lazy, donc à chaque fois que vous faites une action sur un dataframe, toutes les transformations sont exécutées depuis la lecture des sources de données jusqu’à votre action.

Un count dans un log, un show de debug qui traine, un collect… Toutes ces actions sont à bannir de votre code de prod ! Vous ne devez avoir que des write.

2. Filtrer sur la colonne de partitionnement après la lecture

On va partir du principe que la source de donnée que vous lisez est bien partitionnée. Le partitionning par date est le plus fréquent.

Vous n’avez pas forcément besoin de l’intégralité de la donnée que vous lisez. Identifiez bien la ou les colonnes utilisées pour le partitionning de votre source d’entrée et faites un filtre dessus.

Il n’est pas nécessaire d’effectuer ce filtre en premier. Cependant il faut qu’il soit fait avant votre première transformation de type reduce.

3. Sélectionner uniquement les colonnes utiles

Si votre source de données est stockée dans un format orienté colonne, comme parquet, sélectionner uniquement les colonnes utiles réduira la quantité de données à traiter.

Cependant, depuis Spark 3 ce travail est fait automatiquement.

4. Utilisez des broadcast join

Le broadcast join est une solution formidable pour faire une jointure sans shuffle, à condition qu’un des deux dataframes soit petit.

Que signifie petit ? Par défaut, c’est < 10mo. Cela signifie que si Spark détecte, au moment de la jointure, qu’un dataframe fait moins de 10mo, il diffusera le dataframe à l’ensemble des exécuteurs pour éviter le shuffle.

Mon conseil, augmentez cette valeur ! Le paramètre s’appelle spark.sql.autoBroadcastJoinThreshold. Il faut le maximiser tout en restant cohérant avec son infra.

Par exemple, si mes exécuteurs ont 64Go de mémoire vive, je peux me permettre de monter facilement jusqu’à 1 ou 2 Go. Après, il faut tester et voir ce que ça donne. Pas le choix.

L’autre solution serait de définir programmatiquement le broadcast join comme cela :

df1.join(broadcast(df2))

C’est plus fastidieux si on a beaucoup de cas à gérer, mais cela nous évite d’en rater un car on dépasse la limite de pas grand chose.

Attention, on ne peut pas dépasser 8Go pour un broadcast join.

5. Optimiser l’action utilisée

Quand Spark écrit un dataframe, il n’écrit pas un fichier mais plusieurs dans un dossier. On obtient 2 fichiers _SUCCESS qui servent à indiquer que l’écriture est terminée ainsi que 2 fichiers par partition nommés… on a vu mieux ! Mais en même temps on est pas sensé s’en soucier donc c’est normal que cela ne soit pas humainement compréhensible.

Lorsque notre résultat n’est pas bien gros, de l’ordre de 1Go, il peut être tentant de vouloir simplifier l’écriture pour n’avoir qu’un seul fichier bien nommé sans dossier. On peut utiliser toPandas() comme action afin d’utiliser le connecteur Pandas pour écrire son fichier dans le format voulu.

Sauf si notre dataframe résultant est vraiment très petit, de l’ordre de quelques dizaines de mo, c’est une mauvaise idée. Déjà il y a un risque d’OOM si le driver n’a pas assez de mémoire. Ensuite, vous ne profiterez pas du parallélisme de Spark pour écrire votre résultat.

Vous faites du Spark, faites le jusqu’au bout.

6. Evitez les UDFs, surtout en Python

En Scala, l’impact est mineur.

En Python, même si les dernières versions de Spark améliorent l’efficacité des UDFs, l’impact est énorme.

Dans tous les cas, les UDFs sont des boites noires pour Spark, c’est du code qu’il ne peut pas optimiser. S’il existe une façon de le faire en Spark natif, elle sera au pire de performance équivalente.

Si vous êtes en Pyspark et que vraiment vous ne pouvez pas faire sans UDF, sachez qu’il est possible d’utiliser des UDF Scala (ou java) en Python.

7. Utilisez le cache

Surement la première chose à laquelle on pense quand on veut réduire le temps d’exécution d’un job Spark et pourtant je le le place qu’en 7è position. Pourquoi ? Parce que selon le contexte, c’est beaucoup plus complexe qu’on peut le penser. Dans le meilleur cas de figure, on ajoute son cache et hop on a diviser par 2 ou 3 le temps d’exécution total en fonction du nombre de fois qu’on appliquait une action sur un même dataframe. Mais dans d’autres cas, on observera un gain de temps mineur, voir même un job qui ne fonctionne plus.

Pour mieux comprendre, je vous invite à lire mon article détaillé sur quand faire un cache sur un dataframe.

L’objectif reste de pouvoir utiliser le plus haut niveau de cache pour qu’il soit le plus performant possible. Cependant il faut rester réaliste par rapport au nombre de cache qu’on peut avoir à faire, et à notre capacité de stockage.

Attention, un niveau de cache plus bas de signifie pas qu’on en a plus en réserve. Il est tout à fait possible d’utiliser des VMs avec 50Go de disque dure et 256Go de mémoire vive.

Dernier point, le cache n’est pas garanti. Si entre le moment où vous le demandez et le moment où vous en avez besoin, Spark se retrouve en manque de mémoire, il peut choisir de le supprimer.

Pour bien comprendre tout ça, il peut être nécessaire de fouiller dans les méandres de la Spark UI. Bon chance (comme dirait l’autre).

Une dernière difficulté, c’est cadeau : contrairement aux autres actions, il sera nécessaire de lancer en condition réel votre job Spark encore et encore pour tester chaque cache que vous voudrez placer. Cela peut être long et cher. Faites attention.

8. Les checkpoints

Il arrive que peu importe la façon dont on utilise son cache, rien ne s’améliore. Par exemple j’avais un job qui prenait 2h40 à l’origine, traitait 7To de données pour 10 exécuteurs de 256Go de mémoire vive chacun.

Après avoir ajouté mes caches, j’arrivais à 2h alors que j’avais 4 actions basé sur un même dataframe intermédiaire et que j’estimais pouvoir diviser au moins par 3 le temps total.

Pire, parfois le job prenait 4h, parfois 6h, j’ai même fait un record à 12h.

Après avoir cherché, j’ai remarqué que je pouvais avoir plus ou moins d’exécuteurs qui mourraient pendant l’exécution. Si un exécuteur meurt alors qu’il possède une parti de mon cache, je perds mon cache. Donc Spark doit tout recalculer. C’est comme si je n’avais pas de cache.

Une solution que j’aurais pu mettre en place, c’est le checkpoint. Je n’en dirai pas plus que son concept car je ne l’ai finalement pas utilisé et je n’ai donc pas de retour à en donner.

C’est un cache qui équivaut à écrire sur un stockage décentralisé, en dehors de votre cluster Spark. Comme si on faisait une écriture parquet. Sauf que Spark gère automatiquement sa suppression lorsqu’on en a plus besoin.

Grâce à ça, peu importe le nombre d’exécuteurs qui meurent, je ne perds jamais mon cache. En retour, c’est évidemment beaucoup plus lent et cher qu’un cache normal.

J’ai donc préféré comprendre pourquoi mes exécuteurs mourraient.

9. Comprendre son infra (petit REX bonus)

Je n’ai pas de listes de conseils exhaustifs à vous donner pour cette partie, autre qu’il faut utiliser tout ce qu’on sait sur Spark, son fonctionnement interne et l’infra qu’on utilise. Pour illustrer mon propos, je vous propose de vous raconter la fin de mon histoire commencé au point précédent.

Mon job tourne sur Databricks, un cluster composé de 10 machines. 2 on demand, 8 spots, avec autoscaling de 2 à 10 machines afin d’optimiser les coûts. C’est la configuration qui était en place lorsque je suis arrivé dans l’équipe.

Des instances spot, ça signifie que notre Cloud provider, AWS, nous loue des machines moins cher car elles sont actuellement inutilisées. Mais si la demande de ce type de machine augmente, on les perd.

Ah ba voilà, c’est ça le problème. Je perds mes machines spot !

Après un test rapide avec 100% de machines on demand, j’invalide ma théorie. Rien ne change.

Je regarde alors les metrics d’utilisation du CPU et de la mémoire vive. En CPU ça va, on est en moyenne assez proche du 100% sans y être collé. Quelques trous rapide d’utilisation, qui correspondent aux moments où j’écris des fichiers. La mémoire vive elle, colle les 100% quasi tout du long.

Une mémoire utilisée à quasi 100%, des machines qui tombent sans aucune log d’erreur particulière… D’expérience je me dis que c’est un un OOM qui n’est pas bien remonté par Databricks ou AWS et que je ne peux pas voir dans les logs.

J’augmente alors le nombre de machines, problème toujours pas résolu. Pourtant cette fois-ci j’ai de la marge dans l’utilisation de ma mémoire.

Le problème était en faite beaucoup plus simple : l’autoscaling. Les fameux petits trous dans l’utilisation du CPU, de 1 minute ou 2 à peine, suffisaient à provoquer une réduction du nombre d’exécuteurs. Donc, à perdre mon cache.

Une fois désactivée, le résultat final a été sans appel : 40 minutes au lieu de 2h40. Les coûts supplémentaires liés à la désactivation de l’autoscaling sont ainsi largement gommés par la réduction du temps d’exécution.

10. Suivez votre instinct

J’ai rien de plus à vous proposer, mais c’était dommage de ne pas finir sur un compte rond vous trouvez pas ?

Ce qu’il faut retenir

  • Définissez vous un objectif clair avant de commencer
  • Commencez simple. Plus vite vous obtiendrez des résultats, plus vous gagnerez la confiance du reste de l’équipe pour aller plus loin.
  • Attention à ne pas perdre de temps avec des optimisations déjà gérées par Spark.

Si cet article vous a plus et que vous souhaitez en apprendre plus, vous pouvez aller voir ma formation Spark pour développeurs.

Les prochains événements Hymaïa

Pas d'événements pour le moment... Suivez-nous pour être les premiers informé

Appuyez-vous sur les bonnes compétences Data

Nous vous apporterons une réponse sur mesure en vous délivrant notre savoir technologique et méthodologique.