Bueno, hagamos que su conjunto de datos sea un poco más interesante:
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
Tenemos seis elementos:
rdd.count
Long = 6
sin particionador:
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
y ocho particiones:
rdd.partitions.length
Int = 8
Ahora definamos un pequeño ayudante para contar el número de elementos por partición:
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
Como no tenemos particionador, nuestro conjunto de datos se distribuye uniformemente entre particiones ( esquema de particionamiento predeterminado en Spark ):
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
Ahora repartamos nuestro conjunto de datos:
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
Dado que el parámetro pasado a HashPartitioner
define el número de particiones, esperamos una partición:
rddOneP.partitions.length
Int = 1
Como solo tenemos una partición, contiene todos los elementos:
countByPartition(rddOneP).collect
Array[Int] = Array(6)
Tenga en cuenta que el orden de los valores después de la mezcla no es determinista.
De la misma manera si usamos HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
obtendremos 2 particiones:
rddTwoP.partitions.length
Int = 2
Dado que rdd
está particionado por datos clave, ya no se distribuirán de manera uniforme:
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
Porque con tener tres claves y solo dos valores diferentes de hashCode
mod, numPartitions
no hay nada inesperado aquí:
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
Solo para confirmar lo anterior:
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
Finalmente con HashPartitioner(7)
obtenemos siete particiones, tres no vacías con 2 elementos cada una:
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
Resumen y notas
HashPartitioner
toma un solo argumento que define el número de particiones
los valores se asignan a las particiones mediante hash
claves. hash
La función puede diferir dependiendo del idioma (Scala RDD puede usar hashCode
, DataSets
use MurmurHash 3, PySpark, portable_hash
).
En un caso simple como este, donde la clave es un número entero pequeño, puede asumir que hash
es una identidad ( i = hash(i)
).
La API de Scala se utiliza nonNegativeMod
para determinar la partición según el hash calculado,
si la distribución de claves no es uniforme, puede terminar en situaciones en las que parte de su clúster está inactivo
las claves tienen que ser hash. Puede consultar mi respuesta para obtener una lista como clave para reducirByKey de PySpark para leer sobre problemas específicos de PySpark. Otro posible problema se destaca en la documentación de HashPartitioner :
Las matrices Java tienen códigos hash que se basan en las identidades de las matrices en lugar de su contenido, por lo que intentar particionar un RDD [Array [ ]] o RDD [(Array [ ], _)] usando un HashPartitioner producirá un resultado inesperado o incorrecto.
En Python 3, debe asegurarse de que el hash sea coherente. Consulte ¿Qué significa Excepción: la aleatoriedad del hash de la cadena debe desactivarse a través de PYTHONHASHSEED mean en pyspark?
El particionador hash no es inyectivo ni sobreyectivo. Se pueden asignar varias claves a una sola partición y algunas particiones pueden permanecer vacías.
Tenga en cuenta que actualmente los métodos basados en hash no funcionan en Scala cuando se combinan con clases de casos definidas por REPL ( igualdad de clases de casos en Apache Spark ).
HashPartitioner
(o cualquier otro Partitioner
) baraja los datos. A menos que la partición se reutilice entre varias operaciones, no reduce la cantidad de datos que se barajan.
(1, None)
conhash(2) % P
la que P es la partición. ¿No debería serlohash(1) % P
?partitionBy
API en rdd. hay un partitionBy en dataframe.write, pero no toma Partitioner como argumento.RDD
está distribuido, esto significa que está dividido en varias partes. Cada una de estas particiones está potencialmente en una máquina diferente. El particionador hash con argumentonumPartitions
elige en qué partición colocar el par(key, value)
de la siguiente manera:numPartitions
particiones.(key, value)
en partición con númeroHash(key) % numPartitions
fuente
El
HashPartitioner.getPartition
método toma una clave como argumento y devuelve el índice de la partición a la que pertenece la clave. El particionador tiene que saber cuáles son los índices válidos, por lo que devuelve números en el rango correcto. El número de particiones se especifica mediante elnumPartitions
argumento del constructor.La implementación regresa aproximadamente
key.hashCode() % numPartitions
. Consulte Partitioner.scala para obtener más detalles.fuente