¿Cómo ajustar el número de ejecutor de chispa, los núcleos y la memoria de ejecutor?

84

¿Por dónde empezar a ajustar los parámetros mencionados anteriormente? ¿Comenzamos con la memoria del ejecutor y obtenemos el número de ejecutores, o comenzamos con núcleos y obtenemos el número de ejecutor? Seguí el enlace . Sin embargo, tengo una idea de alto nivel, pero todavía no estoy seguro de cómo o por dónde empezar y llegar a una conclusión final.

Ramzy
fuente

Respuestas:

206

La siguiente respuesta cubre los 3 aspectos principales mencionados en el título: número de ejecutores, memoria del ejecutor y número de núcleos. Puede haber otros parámetros como la memoria del controlador y otros que no abordé a partir de esta respuesta, pero me gustaría agregar en un futuro próximo.

Hardware del caso 1: 6 nodos y cada nodo 16 núcleos, 64 GB de RAM

Cada ejecutor es una instancia de JVM. Entonces podemos tener múltiples ejecutores en un solo nodo

Se necesita el primer núcleo y 1 GB para SO y Hadoop Daemons, por lo que están disponibles 15 núcleos, 63 GB de RAM para cada nodo

Comience con cómo elegir el número de núcleos :

Number of cores = Concurrent tasks as executor can run 

So we might think, more concurrent tasks for each executor will give better performance. But research shows that
any application with more than 5 concurrent tasks, would lead to bad show. So stick this to 5.

This number came from the ability of executor and not from how many cores a system has. So the number 5 stays same
even if you have double(32) cores in the CPU.

Número de ejecutores:

Coming back to next step, with 5 as cores per executor, and 15 as total available cores in one Node(CPU) - we come to 
3 executors per node.

So with 6 nodes, and 3 executors per node - we get 18 executors. Out of 18 we need 1 executor (java process) for AM in YARN we get 17 executors

This 17 is the number we give to spark using --num-executors while running from spark-submit shell command

Memoria para cada ejecutor:

From above step, we have 3 executors  per node. And available RAM is 63 GB

So memory for each executor is 63/3 = 21GB. 

However small overhead memory is also needed to determine the full memory request to YARN for each executor.
Formula for that over head is max(384, .07 * spark.executor.memory)

Calculating that overhead - .07 * 21 (Here 21 is calculated as above 63/3)
                            = 1.47

Since 1.47 GB > 384 MB, the over head is 1.47.
Take the above from each 21 above => 21 - 1.47 ~ 19 GB

So executor memory - 19 GB

Números finales - Ejecutores - 17, Núcleos 5, Memoria del ejecutor - 19 GB


Hardware del caso 2: mismo 6 nodos, 32 núcleos, 64 GB

5 es igual para una buena concurrencia

Número de ejecutores para cada nodo = 32/5 ~ 6

Entonces total ejecutores = 6 * 6 Nodos = 36. Entonces el número final es 36 - 1 para AM = 35

La memoria del ejecutor es: 6 ejecutores por cada nodo. 63/6 ~ 10. La sobrecarga es .07 * 10 = 700 MB. Entonces, redondeando a 1 GB como gastos generales, obtenemos 10-1 = 9 GB

Números finales - Ejecutores - 35, Núcleos 5, Memoria de Ejecutor - 9 GB


Caso 3

Los escenarios anteriores comienzan aceptando el número de núcleos como fijo y pasando al número de ejecutores y memoria.

Ahora, para el primer caso, si creemos que no necesitamos 19 GB, y solo 10 GB son suficientes, los siguientes son los números:

núcleos 5 # de ejecutores para cada nodo = 3

En esta etapa, esto conduciría a 21, y luego a 19 según nuestro primer cálculo. Pero como pensamos que 10 está bien (supongamos que hay poca sobrecarga), entonces no podemos cambiar el número de ejecutores por nodo a 6 (como 63/10). Porque con 6 ejecutores por nodo y 5 núcleos se reduce a 30 núcleos por nodo, cuando solo tenemos 16 núcleos. Entonces también necesitamos cambiar el número de núcleos para cada ejecutor.

Así que calculando de nuevo

El número mágico 5 llega a 3 (cualquier número menor o igual a 5). Entonces, con 3 núcleos y 15 núcleos disponibles, obtenemos 5 ejecutores por nodo. Entonces (5 * 6 -1) = 29 ejecutores

Entonces la memoria es 63/5 ~ 12. La sobrecarga es 12 * .07 = .84 Entonces la memoria del ejecutor es 12 - 1 GB = 11 GB

Los números finales son 29 ejecutores, 3 núcleos, la memoria del ejecutor es de 11 GB


Asignación dinámica:

Nota: Límite superior para el número de ejecutores si la asignación dinámica está habilitada. Entonces, esto dice que la aplicación Spark puede consumir todos los recursos si es necesario. Por lo tanto, en un clúster donde tiene otras aplicaciones en ejecución y también necesitan núcleos para ejecutar las tareas, asegúrese de hacerlo a nivel de clúster. Quiero decir que puede asignar un número específico de núcleos para YARN según el acceso del usuario. Entonces puede crear spark_user y luego dar núcleos (min / max) para ese usuario. Estos límites son para compartir entre Spark y otras aplicaciones que se ejecutan en YARN.

spark.dynamicAllocation.enabled: cuando se establece en verdadero, no es necesario mencionar los ejecutores. La razón es la siguiente:

El número de parámetros estáticos que damos en Spark-Submit es para toda la duración del trabajo. Sin embargo, si la asignación dinámica entra en escena, habría diferentes etapas como

Por que empezar:

Número inicial de ejecutores ( spark.dynamicAllocation.initialExecutors ) para comenzar

Cuántos :

Luego, según la carga (tareas pendientes) cuántas solicitar. Estos serían eventualmente los números que damos en Spark-Submit de manera estática. Entonces, una vez que se establecen los números de ejecutor inicial, vamos a los números min ( spark.dynamicAllocation.minExecutors ) y max ( spark.dynamicAllocation.maxExecutors ).

Cuándo pedir o dar:

¿Cuándo solicitamos nuevos ejecutores ( spark.dynamicAllocation.schedulerBacklogTimeout )? Ha habido tareas pendientes durante tanto tiempo. así que solicítalo. El número de ejecutores solicitados en cada ronda aumenta exponencialmente con respecto a la ronda anterior. Por ejemplo, una aplicación agregará 1 ejecutor en la primera ronda, y luego 2, 4, 8 y así sucesivamente en las rondas posteriores. En un punto específico, el máximo anterior entra en escena

cuándo regalamos un ejecutor ( spark.dynamicAllocation.executorIdleTimeout ) -

Por favor corríjame si me perdí algo. Lo anterior es mi entendimiento basado en el blog que compartí en cuestión y algunos recursos en línea. Gracias.

Referencias:

Ramzy
fuente
2
Leí en algún lugar que solo hay un ejecutor por nodo en modo independiente, ¿alguna idea al respecto? No lo veo cubierto en tu respuesta.
jangorecki
2
En un clúster independiente, de forma predeterminada obtenemos un ejecutor por trabajador. Necesitamos jugar con spark.executor.cores y un trabajador tiene suficientes núcleos para obtener más de un ejecutor.
Ramzy
Descubrí que mi trabajador utiliza los 32 núcleos sin configurar spark.executor.cores, por lo que puede usar todos los disponibles de forma predeterminada.
jangorecki
Sí, el valor predeterminado para los núcleos es infinito, como dicen. Entonces, Spark puede usar todos los núcleos disponibles a menos que usted especifique.
Ramzy
2
@Ramzy Creo que debe tenerse en cuenta que incluso con la asignación dinámica, aún debe especificar spark.executor.cores para determinar el tamaño de cada ejecutor que Spark va a asignar. De lo contrario, siempre que Spark vaya a asignar un nuevo ejecutor a su aplicación, asignará un nodo completo (si está disponible), incluso si todo lo que necesita son solo cinco núcleos más.
Dan Markhasin
6

Además, depende de su caso de uso, un parámetro de configuración importante es:

spark.memory.fraction(Fracción de (espacio de pila - 300 MB) utilizado para ejecución y almacenamiento) de http://spark.apache.org/docs/latest/configuration.html#memory-management .

Si no usa cache / persist, configúrelo en 0.1 para que tenga toda la memoria para su programa.

Si usa caché / persistir, puede verificar la memoria ocupada por:

sc.getExecutorMemoryStatus.map(a => (a._2._1 - a._2._2)/(1024.0*1024*1024)).sum

¿Lee datos de HDFS o de HTTP?

Nuevamente, un ajuste depende de su caso de uso.

Thomas Decaux
fuente
¿Sabes cómo se vería el comando map al usar pyspark? Uso sc._jsc.sc (). GetExecutorMemoryStatus () para obtener el estado del ejecutor, pero no puedo hacer nada con lo que devuelve ...
Thomas
1
Lo siento @Thomas Decaux , pero ¿te refieres a la configuración spark.memory.storageFraction=0.1? Debido a que AFAIK spark.memory.fractiondecide la cantidad de memoria utilizada Sparktanto para la ejecución como para el almacenamiento (ya lo mencionaste) y solo la spark.memory.storageFractionmemoria (que está disponible para almacenamiento + ejecución) es inmune al desalojo de caché . Por favor vea este enlace
y2k-shubham
@Thomas Si en mi aplicación solo tengo persist (StorageLevel.DISK_ONLY), esta opción también es aplicable, ¿verdad? ¿Afecta solo a una fracción de memoria, pero no afecta a ningún derrame de disco?
jk1