Spark - repartition () vs coalesce ()

254

De acuerdo con Learning Spark

Tenga en cuenta que repartir sus datos es una operación bastante costosa. Spark también tiene una versión optimizada de repartition()llamadas coalesce()que permite evitar el movimiento de datos, pero solo si está disminuyendo el número de particiones RDD.

Una diferencia que obtengo es que con repartition()el número de particiones se puede aumentar / disminuir, pero con coalesce()el número de particiones solo se puede disminuir.

Si las particiones se distribuyen en varias máquinas y coalesce()se ejecutan, ¿cómo puede evitar el movimiento de datos?

Praveen Sripati
fuente

Respuestas:

354

Evita una barajadura completa . Si se sabe que el número está disminuyendo, entonces el ejecutor puede mantener de forma segura los datos en el número mínimo de particiones, solo moviendo los datos de los nodos adicionales a los nodos que guardamos.

Entonces, sería algo como esto:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

Luego coalescea 2 particiones:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

Observe que el Nodo 1 y el Nodo 3 no requieren que sus datos originales se muevan.

Justin Pihony
fuente
115
Gracias por la respuesta. La documentación debería haber dicho mejor en minimize data movementlugar de avoiding data movement.
Praveen Sripati
12
¿Hay algún caso cuando se repartitiondebe usar en lugar de coalesce?
Niemand
21
@Niemand Creo que la documentación actual cubre esto bastante bien: github.com/apache/spark/blob/… Tenga en cuenta que todo lo que repartitionhace es llamar coalescecon el shuffleparámetro establecido en verdadero. Avísame si eso ayuda.
Justin Pihony
2
¿Es posible reducir el número de archivos de partición que existen? No tengo hdfs, pero tengo problemas con muchos archivos.
2
la repartición será estadísticamente más lenta ya que no sabe que se está reduciendo ... aunque tal vez podrían optimizar eso. Internamente, solo se fusiona con una shuffle = truebandera
Justin Pihony
172

La respuesta de Justin es asombrosa y esta respuesta se profundiza.

El repartitionalgoritmo realiza una combinación completa y crea nuevas particiones con datos que se distribuyen de manera uniforme. Creemos un DataFrame con los números del 1 al 12.

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf Contiene 4 particiones en mi máquina.

numbersDf.rdd.partitions.size // => 4

Así es como se dividen los datos en las particiones:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

Hagamos una combinación completa con el repartitionmétodo y obtengamos estos datos en dos nodos.

val numbersDfR = numbersDf.repartition(2)

Así es como numbersDfRse dividen los datos en mi máquina:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

El repartitionmétodo crea nuevas particiones y distribuye uniformemente los datos en las nuevas particiones (la distribución de datos es más uniforme para conjuntos de datos más grandes).

Diferencia entre coalesceyrepartition

coalesce usa particiones existentes para minimizar la cantidad de datos que se barajan. repartitioncrea nuevas particiones y baraja completamente. coalesceda como resultado particiones con diferentes cantidades de datos (a veces particiones que tienen tamaños muy diferentes) y repartitionda como resultado particiones de aproximadamente el mismo tamaño.

¿Es coalesceo repartitionmás rápido?

coalescepuede ejecutarse más rápido que repartition, pero las particiones de tamaño desigual son generalmente más lentas para trabajar que las particiones de igual tamaño. Por lo general, deberá volver a particionar los conjuntos de datos después de filtrar un conjunto de datos grande. En general, he descubierto repartitionque es más rápido porque Spark está diseñado para funcionar con particiones de igual tamaño.

Nota: curiosamente he observado que la repartición puede aumentar el tamaño de los datos en el disco . Asegúrese de ejecutar pruebas cuando esté utilizando repartición / fusión en grandes conjuntos de datos.

Lee esta publicación de blog si desea aún más detalles.

Cuando usarás fusión y reparto en la práctica

Potestades
fuente
8
Gran respuesta @Powers, pero ¿no están sesgados los datos en la Partición A y B? ¿Cómo se distribuye uniformemente?
anwartheravian
Además, ¿cuál es la mejor manera de obtener el tamaño de la partición sin obtener un error OOM? Yo uso rdd.glom().map(len).collect()pero da muchos errores OOM.
anwartheravian
8
@anwartheravian: la partición A y la partición B tienen tamaños diferentes porque el repartitionalgoritmo no distribuye los datos de la misma manera para conjuntos de datos muy pequeños. Solía repartitionorganizar 5 millones de registros en 13 particiones y cada archivo tenía entre 89.3 MB y 89.6 MB, ¡eso es bastante igual!
Powers
1
@Powers este aspecto mejor respuesta con detalle.
Verde
1
Esto explica la diferencia mucho mejor. ¡Gracias!
Abhi
22

Un punto adicional a tener en cuenta aquí es que, como el principio básico de Spark RDD es la inmutabilidad. La repartición o fusión creará un nuevo RDD. El RDD base continuará existiendo con su número original de particiones. En caso de que el caso de uso exija que el RDD persista en la memoria caché, entonces se debe hacer lo mismo para el RDD recién creado.

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2
Harikrishnan Ck
fuente
¡Buena esa! esto es fundamental y por lo menos a este prog Scala con experiencia, no es obvio - es decir, ni reparto ni se unen intento de modificar los datos, simplemente la forma en que se distribuye a través de nodos
Doug
1
@Harikrishnan, así que si entendí las otras respuestas correctamente, según ellos en caso de fusión, Spark usa particiones existentes, sin embargo, dado que RDD es inmutable, ¿puede describir cómo Coalesce utiliza las particiones existentes? Según tengo entendido, pensé que Spark agrega nuevas particiones a las particiones existentes en combinación.
Explorer
Pero si el "viejo" RDD ya no se usa como se conoce en el gráfico de ejecución, se borrará de la memoria si no persiste, ¿no?
Markus
15

repartition - Se recomienda usarlo mientras aumenta el número de particiones, ya que implica la combinación aleatoria de todos los datos.

coalesce- Se recomienda usarlo mientras se reduce el número de particiones. Por ejemplo, si tiene 3 particiones y desea reducirlas a 2, coalescemoverá los datos de la tercera partición a las particiones 1 y 2. Las particiones 1 y 2 permanecerán en el mismo contenedor. Por otro lado, repartitionbarajará los datos en todas las particiones, por lo tanto, el uso de la red entre los ejecutores será alto y afectará el rendimiento.

coalescefunciona mejor que repartitionmientras reduce el número de particiones.

Kamalesan C
fuente
Explicación útil
Narendra Maru
11

Lo que se desprende del código y los documentos de código es que coalesce(n)es igual coalesce(n, shuffle = false)y repartition(n)es igual quecoalesce(n, shuffle = true)

Por lo tanto, ambos coalescey repartitionse pueden usar para aumentar el número de particiones

Con shuffle = true, en realidad puede unirse a un mayor número de particiones. Esto es útil si tiene una pequeña cantidad de particiones, digamos 100, potencialmente con unas pocas particiones que son anormalmente grandes.

Otra nota importante para acentuar es que si disminuye drásticamente el número de particiones, debería considerar usar una versión aleatoria de coalesce(igual que repartitionen ese caso). Esto permitirá que sus cálculos se realicen en paralelo en particiones principales (tarea múltiple).

Sin embargo, si está haciendo una fusión drástica, por ejemplo, a numPartitions = 1, esto puede dar lugar a que su cálculo se realice en menos nodos de los que desee (por ejemplo, un nodo en el caso de numPartitions = 1). Para evitar esto, puedes pasar shuffle = true. Esto agregará un paso aleatorio, pero significa que las particiones ascendentes actuales se ejecutarán en paralelo (sea cual sea la partición actual).

Consulte también la respuesta relacionada aquí

kasur
fuente
10

Todas las respuestas están agregando un gran conocimiento a esta pregunta muy frecuente.

Entonces, siguiendo la tradición de la línea de tiempo de esta pregunta, aquí están mis 2 centavos.

Encontré que la repartición es más rápida que la fusión , en un caso muy específico.

En mi aplicación, cuando el número de archivos que estimamos es inferior al umbral determinado, la repartición funciona más rápido.

Esto es lo que quiero decir

if(numFiles > 20)
    df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
    df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

En el fragmento anterior, si mis archivos tenían menos de 20, la fusión tardaba una eternidad en terminar, mientras que el reparto era mucho más rápido y, por lo tanto, el código anterior.

Por supuesto, este número (20) dependerá de la cantidad de trabajadores y la cantidad de datos.

Espero que ayude.

Abhishek
fuente
6

Repartición : baraja los datos en un NUEVO número de particiones.

P.ej. El marco de datos inicial se divide en 200 particiones.

df.repartition(500): Los datos se barajarán de 200 particiones a 500 nuevas particiones.

Fusión : baraja los datos en un número existente de particiones.

df.coalesce(5): Los datos se barajarán de 195 particiones restantes a 5 particiones existentes.

Rahul
fuente
4

Me gustaría agregar a la respuesta de Justin y Power que:

repartitionignorará las particiones existentes y creará otras nuevas. Entonces puede usarlo para corregir el sesgo de datos. Puede mencionar las claves de partición para definir la distribución. El sesgo de datos es uno de los mayores problemas en el espacio de problemas de 'big data'.

coalescefuncionará con particiones existentes y barajará un subconjunto de ellas. No puede corregir el sesgo de datos tanto como lo repartitionhace. Por lo tanto, incluso si es menos costoso, puede que no sea lo que necesita.

Salim
fuente
3

A todas las excelentes respuestas que me gustaría agregar, esa repartitiones una de las mejores opciones para aprovechar la paralelización de datos. Mientrascoalesce ofrece una opción barata para reducir las particiones y es muy útil cuando se escriben datos en HDFS o en algún otro receptor para aprovechar las grandes escrituras.

He encontrado esto útil al escribir datos en formato parquet para obtener la máxima ventaja.

Ashkrit Sharma
fuente
2

Para alguien que tuvo problemas para generar un solo archivo csv de PySpark (AWS EMR) como salida y guardarlo en s3, el uso de la partición ayudó. La razón es que la fusión no puede hacer una mezcla completa, pero la repartición sí. Esencialmente, puede aumentar o disminuir el número de particiones usando la división, pero solo puede disminuir el número de particiones (pero no 1) usando la fusión. Aquí está el código para cualquier persona que intente escribir un csv de AWS EMR en s3:

df.repartition(1).write.format('csv')\
.option("path", "s3a://my.bucket.name/location")\
.save(header = 'true')
Robar
fuente
0

De una manera simple COALESCE: - es solo para disminuir el no de particiones, sin mezclar datos, solo comprime las particiones

REPARACIÓN: es tanto para aumentar como para disminuir el número de particiones, pero se baraja

Ejemplo:-

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

Ambos funcionan bien

Pero generalmente vamos por estas dos cosas cuando necesitamos ver la salida en un clúster, vamos con esto.

Bujuti Niranjan Reddy
fuente
99
Habrá movimiento de datos con Coalese también.
sun_dare
0

Pero también debe asegurarse de que, si se trata de datos de gran tamaño, los datos que están llegando a los nodos de fusión deben estar altamente configurados. Debido a que todos los datos se cargarán en esos nodos, puede provocar una excepción de memoria. Aunque la reparación es costosa, prefiero usarla. Ya que baraja y distribuye los datos por igual.

Sea prudente para seleccionar entre fusión y reparto.

Arun Goudar
fuente
0

El repartitionalgoritmo realiza una combinación completa de los datos y crea particiones de datos de igual tamaño. coalescecombina particiones existentes para evitar una barajadura completa.

Coalesce funciona bien para tomar un RDD con muchas particiones y combinar particiones en un solo nodo de trabajo para producir un RDD final con menos particiones.

Repartitionreorganizará los datos en su RDD para producir el número final de particiones que solicite. La partición de DataFrames parece un detalle de implementación de bajo nivel que debe ser administrado por el marco, pero no lo es. Al filtrar grandes DataFrames en pequeños, casi siempre debe volver a particionar los datos. Probablemente filtrará DataFrames grandes en marcos más pequeños con frecuencia, así que acostúmbrese a volver a particionar.

Lea esta publicación de blog si desea aún más detalles.

Sambhav Kumar
fuente