¿Cómo se dividen las etapas en tareas en Spark?

143

Supongamos por lo siguiente que solo se está ejecutando un trabajo de Spark en cada momento.

Lo que llego hasta ahora

Esto es lo que entiendo que sucede en Spark:

  1. Cuando SparkContextse crea un, cada nodo de trabajo inicia un ejecutor. Los ejecutores son procesos separados (JVM), que se conectan nuevamente al programa del controlador. Cada ejecutor tiene el frasco del programa controlador. Renunciar a un conductor, apaga a los ejecutores. Cada ejecutor puede contener algunas particiones.
  2. Cuando se ejecuta un trabajo, se crea un plan de ejecución de acuerdo con el gráfico de linaje.
  3. El trabajo de ejecución se divide en etapas, donde las etapas contienen tantas transformaciones y acciones vecinas (en el gráfico de linaje), pero no barajan. Por lo tanto, las etapas están separadas por barajaduras.

imagen 1

Entiendo que

  • Una tarea es un comando enviado desde el controlador a un ejecutor al serializar el objeto Function.
  • El ejecutor deserializa (con el controlador jar) el comando (tarea) y lo ejecuta en una partición.

pero

Pregunta (s)

¿Cómo divido el escenario en esas tareas?

Específicamente:

  1. ¿Las tareas están determinadas por las transformaciones y acciones o pueden ser múltiples transformaciones / acciones en una tarea?
  2. Son las tareas determinadas por la partición (por ejemplo, una tarea por etapa por partición).
  3. ¿Las tareas están determinadas por los nodos (por ejemplo, una tarea por etapa por nodo)?

Lo que pienso (solo respuesta parcial, incluso si es correcto)

En https://0x0fff.com/spark-architecture-shuffle , el shuffle se explica con la imagen

ingrese la descripción de la imagen aquí

y tengo la impresión de que la regla es

cada etapa se divide en # tareas de número de particiones, sin tener en cuenta el número de nodos

Para mi primera imagen, diría que tendría 3 tareas de mapa y 3 tareas de reducción.

Para la imagen de 0x0fff, diría que hay 8 tareas de mapa y 3 tareas de reducción (suponiendo que solo haya tres archivos naranja y tres verde oscuro).

Preguntas abiertas en cualquier caso

¿Es eso correcto? Pero incluso si eso es correcto, mis preguntas anteriores no están todas respondidas, porque todavía está abierto, ya sea que varias operaciones (por ejemplo, múltiples mapas) estén dentro de una tarea o estén separadas en una tarea por operación.

Lo que otros dicen

¿Qué es una tarea en Spark? ¿Cómo ejecuta el trabajador Spark el archivo jar? y ¿Cómo divide el planificador Apache Spark los archivos en tareas? son similares, pero no sentí que mi pregunta fuera respondida claramente allí.

Make42
fuente

Respuestas:

52

Tienes un lindo resumen aquí. Para responder tu pregunta

  • Una separada task Qué necesita ser puesto en marcha para cada partición de datos para cada uno stage. Tenga en cuenta que cada partición probablemente residirá en ubicaciones físicas distintas, por ejemplo, bloques en HDFS o directorios / volúmenes para un sistema de archivos local.

Tenga en cuenta que el envío de Stages es impulsado por el DAG Scheduler. Esto significa que las etapas que no son interdependientes pueden enviarse al clúster para su ejecución en paralelo: esto maximiza la capacidad de paralelización en el clúster. Entonces, si las operaciones en nuestro flujo de datos pueden ocurrir simultáneamente, esperamos ver múltiples etapas lanzadas.

Podemos ver eso en acción en el siguiente ejemplo de juguete en el que hacemos los siguientes tipos de operaciones:

  • cargar dos fuentes de datos
  • realizar alguna operación de mapa en ambas fuentes de datos por separado
  • Únete a ellos
  • realizar algunas operaciones de mapa y filtro en el resultado
  • guardar el resultado

Entonces, ¿con cuántas etapas terminaremos?

  • 1 etapa cada una para cargar las dos fuentes de datos en paralelo = 2 etapas
  • Una tercera etapa que representa el joinque depende de las otras dos etapas.
  • Nota: todas las operaciones de seguimiento que trabajan en los datos unidos pueden realizarse en la misma etapa porque deben suceder secuencialmente. No es beneficioso lanzar etapas adicionales porque no pueden comenzar a trabajar hasta que se complete la operación anterior.

Aquí está ese programa de juguetes

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

Y aquí está el DAG del resultado.

ingrese la descripción de la imagen aquí

Ahora: ¿cuántas tareas ? El número de tareas debe ser igual a

Suma de ( Stage* #Partitions in the stage)

javadba
fuente
2
¡Gracias! Explique su respuesta con respecto a mi texto: 1) ¿Mi definición de etapas no es exhaustiva? Parece que no cumplí el requisito de que una etapa no puede contener operaciones que podrían ser paralelas. ¿O mi descripción ya lo implica estrictamente? 2) El número de tareas que se deben ejecutar para el trabajo está determinado por el número de particiones, pero no por el número de procesadores o nodos, mientras que el número de tareas que se pueden ejecutar al mismo tiempo depende del número de particiones. procesadores, ¿verdad? 3) ¿Una tarea puede contener múltiples operaciones?
Make42
1
4) ¿Qué quisiste decir con tu última oración? Después de todo, las particiones numéricas pueden variar de una etapa a otra. ¿Quiso decir que así es como configuró su trabajo para todas las etapas?
Make42
@ Make42 Por supuesto, el número de particiones puede variar de una etapa a otra: está en lo correcto. Era mi intención decir sum(..)que tomara en cuenta esa variación.
javadba
wow, tu respuesta fue totalmente correcta pero desafortunadamente, la última oración es definitivamente un concepto incorrecto No significa que los números de partición en una etapa sean iguales al número de procesadores, sin embargo, puede establecer el número de particiones para un RDD de acuerdo con el número de núcleos presentados en su máquina.
epcpu
@epcpu Era un caso especial, pero estoy de acuerdo en que sería engañoso, así que lo estoy eliminando.
javadba
26

Esto podría ayudarlo a comprender mejor las diferentes piezas:

  • Etapa: es una colección de tareas. El mismo proceso se ejecuta en diferentes subconjuntos de datos (particiones).
  • Tarea: representa una unidad de trabajo en una partición de un conjunto de datos distribuido. Entonces, en cada etapa, número de tareas = número de particiones, o como dijiste "una tarea por etapa por partición".
  • Cada ejecutador se ejecuta en un contenedor de hilo, y cada contenedor reside en un nodo.
  • Cada etapa utiliza múltiples ejecutores, a cada ejecutor se le asignan múltiples vcores.
  • Cada vcore puede ejecutar exactamente una tarea a la vez
  • Entonces, en cualquier etapa, se pueden ejecutar múltiples tareas en paralelo. número de tareas en ejecución = número de vcores en uso.
pedram bashiri
fuente
2
Esta es una lectura realmente útil sobre la arquitectura de chispa: 0x0fff.com/spark-architecture
pedram bashiri
No obtuve su número de punto 3. Hasta donde sé, cada nodo puede tener múltiples ejecutores, por lo tanto, de acuerdo con el punto 3: debe haber solo un ejecutor por nodo. ¿Puedes aclarar este punto?
Rituparno Behera
@RituparnoBehera cada nodo puede tener múltiples contenedores y por lo tanto múltiples ejecutores de Spark. Mira este enlace. docs.cloudera.com/runtime/7.0.2/running-spark-applications/…
pedram bashiri
15

Si entiendo correctamente, hay 2 cosas (relacionadas) que te confunden:

1) ¿Qué determina el contenido de una tarea?

2) ¿Qué determina el número de tareas a ejecutar?

El motor de Spark "pega" operaciones simples en rdds consecutivos, por ejemplo:

rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

así que cuando se calcula (perezosamente) rdd3, spark generará una tarea por partición de rdd1 y cada tarea ejecutará tanto el filtro como el mapa por línea para dar como resultado rdd3.

El número de tareas está determinado por el número de particiones. Cada RDD tiene un número definido de particiones. Para un RDD de origen que se lee desde HDFS (usando sc.textFile (...) por ejemplo), el número de particiones es el número de divisiones generadas por el formato de entrada. Algunas operaciones en RDD (s) pueden resultar en un RDD con un número diferente de particiones:

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

Otro ejemplo es une:

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).

(La mayoría) de las operaciones que cambian el número de particiones implican una combinación aleatoria, cuando hacemos, por ejemplo:

rdd2 = rdd1.repartition( 1000 ) 

lo que realmente sucede es que la tarea en cada partición de rdd1 necesita producir una salida final que pueda leerse en la siguiente etapa para que rdd2 tenga exactamente 1000 particiones (¿Cómo lo hacen? Hash u Ordenar ). Las tareas en este lado a veces se denominan "tareas de mapa (lado)". Una tarea que luego se ejecutará en rdd2 actuará en una partición (¡de rdd2!) Y tendría que descubrir cómo leer / combinar las salidas del lado del mapa relevantes para esa partición. Las tareas de este lado a veces se denominan "tareas de reducción (de lado)".

Las 2 preguntas están relacionadas: el número de tareas en una etapa es el número de particiones (común a los rdds consecutivos "pegados" juntos) y el número de particiones de un rdd puede cambiar entre etapas (al especificar el número de particiones para algunos shuffle causando operación por ejemplo).

Una vez que comienza la ejecución de una etapa, sus tareas pueden ocupar espacios de tareas. El número de ranuras de tareas simultáneas es numExecutors * ExecutorCores. En general, estos pueden ser ocupados por tareas de diferentes etapas no dependientes.

Harel Gliksman
fuente