Estoy tratando de entender la relación entre la cantidad de núcleos y la cantidad de ejecutores cuando se ejecuta un trabajo de Spark en YARN.
El entorno de prueba es el siguiente:
- Número de nodos de datos: 3
- Especificaciones de la máquina del nodo de datos:
- CPU: Core i7-4790 (# de núcleos: 4, # de hilos: 8)
- RAM: 32 GB (8 GB x 4)
- HDD: 8 TB (2 TB x 4)
Red: 1 Gb
Versión Spark: 1.0.0
Versión de Hadoop: 2.4.0 (Hortonworks HDP 2.1)
Generar flujo de trabajo: sc.textFile -> filter -> map -> filter -> mapToPair -> reduceByKey -> map -> saveAsTextFile
Datos de entrada
- Tipo: archivo de texto único
- Tamaño: 165GB
- Número de líneas: 454,568,833
Salida
- Número de líneas después del segundo filtro: 310,640,717
- Número de líneas del archivo de resultados: 99,848,268
- Tamaño del archivo de resultados: 41 GB
El trabajo se ejecutó con las siguientes configuraciones:
--master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3
(Ejecutores por nodo de datos, use tanto como núcleos)--master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3
(# de núcleos reducidos)--master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12
(menos núcleo, más ejecutor)
Tiempos transcurridos:
50 min 15 seg
55 min 48 seg
31 min 23 seg
Para mi sorpresa, (3) fue mucho más rápido.
Pensé que (1) sería más rápido, ya que habría menos comunicación entre ejecutores al barajar.
Aunque el número de núcleos de (1) es menor que (3), el número de núcleos no es el factor clave ya que 2) funcionó bien.
(Se agregaron los siguientes después de la respuesta de pwilmot).
Para la información, la captura de pantalla del monitor de rendimiento es la siguiente:
- Resumen del nodo de datos de Ganglia para (1): el trabajo comenzó a las 04:37.
- Resumen de nodo de datos de Ganglia para (3): el trabajo comenzó a las 19:47. Por favor ignore el gráfico antes de ese momento.
El gráfico se divide aproximadamente en 2 secciones:
- Primero: desde el principio para reducir ByKey: CPU intensiva, sin actividad de red
- Segundo: después de reduceByKey: CPU baja, se realiza la E / S de red.
Como muestra el gráfico, (1) puede usar tanta potencia de CPU como se le dio. Por lo tanto, podría no ser el problema del número de subprocesos.
¿Cómo explicar este resultado?
fuente
Respuestas:
La explicación se dio en un artículo en el blog de Cloudera, How-to: Tune Your Apache Spark Jobs (Part 2) .
fuente
yarn.scheduler.capacity.resource-calculator
deshabilitado, que es el valor predeterminado. Esto se debe a que, de manera predeterminada, se programa por memoria y no por CPU.A medida que ejecuta su aplicación spark sobre HDFS, según Sandy Ryza
Por lo tanto, creo que su primera configuración es más lenta que la tercera debido al mal rendimiento de E / S de HDFS
fuente
No he jugado con esta configuración, así que esto es solo especulación, pero si pensamos en este problema como núcleos e hilos normales en un sistema distribuido, en su clúster puede usar hasta 12 núcleos (máquinas 4 * 3) y 24 hilos (8 * 3 máquinas). En los primeros dos ejemplos, le está dando a su trabajo una cantidad justa de núcleos (espacio de cómputo potencial), pero la cantidad de subprocesos (trabajos) para ejecutar en esos núcleos es tan limitada que no puede utilizar gran parte de la potencia de procesamiento asignada y, por lo tanto, el trabajo es más lento a pesar de que hay más recursos de cálculo asignados.
Usted menciona que su preocupación estaba en el paso de barajar: aunque es bueno limitar la sobrecarga en el paso de barajar, generalmente es mucho más importante utilizar la paralelización del clúster. Piense en el caso extremo: un programa de un solo subproceso con cero aleatorio.
fuente
Creo que la respuesta aquí puede ser un poco más simple que algunas de las recomendaciones aquí.
La pista para mí está en el gráfico de red del clúster. Para la ejecución 1, la utilización es constante a ~ 50 M bytes / s. Para la ejecución 3, la utilización constante se duplica, alrededor de 100 M bytes / s.
Desde la publicación del blog cloudera compartida por DzOrd , puede ver esta importante cita:
Entonces, hagamos algunos cálculos para ver qué rendimiento esperamos si eso es cierto.
Ejecute 1: 19 GB, 7 núcleos, 3 ejecutores
Ejecutar 3: 4 GB, 2 núcleos, 12 ejecutores
Si el trabajo está 100% limitado por concurrencia (el número de subprocesos). Esperaríamos que el tiempo de ejecución esté perfectamente inversamente correlacionado con el número de hilos.
Entonces
ratio_num_threads ~= inv_ratio_runtime
, y parece que tenemos una red limitada.Este mismo efecto explica la diferencia entre la ejecución 1 y la ejecución 2.
Run 2: 19 GB, 4 núcleos, 3 ejecutores
Comparando el número de hilos efectivos y el tiempo de ejecución:
No es tan perfecto como la última comparación, pero aún vemos una caída similar en el rendimiento cuando perdemos hilos.
Ahora para el último bit: ¿por qué es el caso de que obtengamos un mejor rendimiento con más hilos, esp. más hilos que el número de CPU?
Una buena explicación de la diferencia entre paralelismo (lo que obtenemos al dividir los datos en múltiples CPU) y la concurrencia (lo que obtenemos cuando usamos múltiples hilos para trabajar en una sola CPU) se proporciona en esta gran publicación de Rob Pike: Concurrencia No es paralelismo .
La explicación breve es que si un trabajo de Spark está interactuando con un sistema de archivos o una red, la CPU pasa mucho tiempo esperando la comunicación con esas interfaces y no pasa mucho tiempo "haciendo trabajo". Al dar a esas CPU más de 1 tarea para trabajar a la vez, pasan menos tiempo esperando y más tiempo trabajando, y se ve un mejor rendimiento.
fuente
De los excelentes recursos disponibles en la página del paquete Sparklyr de RStudio :
fuente
La asignación dinámica de Spark brinda flexibilidad y asigna recursos dinámicamente. En este número de ejecutores mínimo y máximo se pueden dar. También se puede indicar el número de ejecutores que se deben iniciar al inicio de la aplicación.
Lea a continuación sobre el mismo:
fuente
Hay un pequeño problema en las dos primeras configuraciones, creo. Los conceptos de hilos y núcleos como a continuación. El concepto de subprocesos es que si los núcleos son ideales, utilice ese núcleo para procesar los datos. Por lo tanto, la memoria no se utiliza por completo en los primeros dos casos. Si desea realizar una evaluación comparativa de este ejemplo, elija las máquinas que tienen más de 10 núcleos en cada máquina. Luego haz el punto de referencia.
Pero no dé más de 5 núcleos por ejecutor, habrá un cuello de botella en el rendimiento de E / S.
Por lo tanto, las mejores máquinas para hacer este marcado de banco podrían ser nodos de datos que tienen 10 núcleos.
Especificaciones de la máquina del nodo de datos: CPU: Core i7-4790 (# de núcleos: 10, # de hilos: 20) RAM: 32GB (8GB x 4) HDD: 8TB (2TB x 4)
fuente
Creo que una de las principales razones es la localidad. El tamaño del archivo de entrada es 165G, los bloques relacionados del archivo ciertamente distribuidos en múltiples DataNodes, más ejecutores pueden evitar la copia de la red.
Intente establecer el número de bloques iguales del ejecutor, creo que puede ser más rápido.
fuente