¿Cómo funciona HashPartitioner?

82

Leí la documentación de HashPartitioner. Desafortunadamente, no se explicó mucho excepto las llamadas a la API. Supongo que HashPartitionerparticiona el conjunto distribuido en función del hash de las claves. Por ejemplo, si mis datos son como

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)

Entonces, el particionador colocaría esto en diferentes particiones con las mismas claves que caen en la misma partición. Sin embargo, no entiendo el significado del argumento del constructor.

new HashPartitoner(numPartitions) //What does numPartitions do?

Para el conjunto de datos anterior, ¿cómo diferirían los resultados si lo hiciera?

new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)

Entonces, ¿cómo HashPartitionerfunciona realmente?

Sohaib
fuente

Respuestas:

161

Bueno, hagamos que su conjunto de datos sea un poco más interesante:

val rdd = sc.parallelize(for {
    x <- 1 to 3
    y <- 1 to 2
} yield (x, None), 8)

Tenemos seis elementos:

rdd.count
Long = 6

sin particionador:

rdd.partitioner
Option[org.apache.spark.Partitioner] = None

y ocho particiones:

rdd.partitions.length
Int = 8

Ahora definamos un pequeño ayudante para contar el número de elementos por partición:

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

Como no tenemos particionador, nuestro conjunto de datos se distribuye uniformemente entre particiones ( esquema de particionamiento predeterminado en Spark ):

countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

distribución inicial

Ahora repartamos nuestro conjunto de datos:

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))

Dado que el parámetro pasado a HashPartitionerdefine el número de particiones, esperamos una partición:

rddOneP.partitions.length
Int = 1

Como solo tenemos una partición, contiene todos los elementos:

countByPartition(rddOneP).collect
Array[Int] = Array(6)

hash-particionador-1

Tenga en cuenta que el orden de los valores después de la mezcla no es determinista.

De la misma manera si usamos HashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

obtendremos 2 particiones:

rddTwoP.partitions.length
Int = 2

Dado que rddestá particionado por datos clave, ya no se distribuirán de manera uniforme:

countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)

Porque con tener tres claves y solo dos valores diferentes de hashCodemod, numPartitionsno hay nada inesperado aquí:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

Solo para confirmar lo anterior:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

hash-particionador-2

Finalmente con HashPartitioner(7)obtenemos siete particiones, tres no vacías con 2 elementos cada una:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

hash-particionador-7

Resumen y notas

  • HashPartitioner toma un solo argumento que define el número de particiones
  • los valores se asignan a las particiones mediante hashclaves. hashLa función puede diferir dependiendo del idioma (Scala RDD puede usar hashCode, DataSetsuse MurmurHash 3, PySpark, portable_hash).

    En un caso simple como este, donde la clave es un número entero pequeño, puede asumir que hashes una identidad ( i = hash(i)).

    La API de Scala se utiliza nonNegativeModpara determinar la partición según el hash calculado,

  • si la distribución de claves no es uniforme, puede terminar en situaciones en las que parte de su clúster está inactivo

  • las claves tienen que ser hash. Puede consultar mi respuesta para obtener una lista como clave para reducirByKey de PySpark para leer sobre problemas específicos de PySpark. Otro posible problema se destaca en la documentación de HashPartitioner :

    Las matrices Java tienen códigos hash que se basan en las identidades de las matrices en lugar de su contenido, por lo que intentar particionar un RDD [Array [ ]] o RDD [(Array [ ], _)] usando un HashPartitioner producirá un resultado inesperado o incorrecto.

  • En Python 3, debe asegurarse de que el hash sea coherente. Consulte ¿Qué significa Excepción: la aleatoriedad del hash de la cadena debe desactivarse a través de PYTHONHASHSEED mean en pyspark?

  • El particionador hash no es inyectivo ni sobreyectivo. Se pueden asignar varias claves a una sola partición y algunas particiones pueden permanecer vacías.

  • Tenga en cuenta que actualmente los métodos basados ​​en hash no funcionan en Scala cuando se combinan con clases de casos definidas por REPL ( igualdad de clases de casos en Apache Spark ).

  • HashPartitioner(o cualquier otro Partitioner) baraja los datos. A menos que la partición se reutilice entre varias operaciones, no reduce la cantidad de datos que se barajan.

zero323
fuente
Excelente redacción, gracias. Sin embargo, he notado en las imágenes que tienes (1, None)con hash(2) % Pla que P es la partición. ¿No debería serlo hash(1) % P?
javamonkey79
Estoy usando Spark 2.2 y no hay partitionByAPI en rdd. hay un partitionBy en dataframe.write, pero no toma Partitioner como argumento.
hakunami
respuesta impresionante ... shuffle se basa en partioner, un buen partioner puede reducir la cantidad de datos barajados.
León
Gran respuesta. Tengo una pregunta que no obtengo una respuesta sólida en Internet. Cuando usamos df.
dsk
@dsk, si no especifica una clave, creo que la repartición usa RoundRobinPartitioning. Algo de discusión aquí .
Mike Souder
6

RDDestá distribuido, esto significa que está dividido en varias partes. Cada una de estas particiones está potencialmente en una máquina diferente. El particionador hash con argumento numPartitionselige en qué partición colocar el par (key, value)de la siguiente manera:

  1. Crea exactamente numPartitionsparticiones.
  2. Lugares (key, value)en partición con númeroHash(key) % numPartitions
abalcerek
fuente
3

El HashPartitioner.getPartitionmétodo toma una clave como argumento y devuelve el índice de la partición a la que pertenece la clave. El particionador tiene que saber cuáles son los índices válidos, por lo que devuelve números en el rango correcto. El número de particiones se especifica mediante elnumPartitions argumento del constructor.

La implementación regresa aproximadamente key.hashCode() % numPartitions. Consulte Partitioner.scala para obtener más detalles.

Daniel Darabos
fuente