Empecé a usar Spark SQL y DataFrames en Spark 1.4.0. Quiero definir un particionador personalizado en DataFrames, en Scala, pero no veo cómo hacerlo.
Una de las tablas de datos con las que estoy trabajando contiene una lista de transacciones, por cuenta, silimar para el siguiente ejemplo.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
Al menos inicialmente, la mayoría de los cálculos ocurrirán entre las transacciones dentro de una cuenta. Por lo tanto, me gustaría tener los datos particionados para que todas las transacciones de una cuenta estén en la misma partición Spark.
Pero no veo una manera de definir esto. La clase DataFrame tiene un método llamado 'repartición (Int)', donde puede especificar el número de particiones para crear. Pero no veo ningún método disponible para definir un particionador personalizado para un DataFrame, como puede especificarse para un RDD.
Los datos de origen se almacenan en Parquet. Vi que al escribir un DataFrame en Parquet, puede especificar una columna para particionar, por lo que presumiblemente podría decirle a Parquet que particione sus datos en la columna 'Cuenta'. Pero podría haber millones de cuentas, y si entiendo Parquet correctamente, crearía un directorio distinto para cada Cuenta, por lo que no parecía una solución razonable.
¿Hay alguna manera de hacer que Spark particione este DataFrame para que todos los datos de una Cuenta estén en la misma partición?
fuente
int(account/someInteger)
y así obtener un número razonable de cuentas por directorio.partitionBy(Partitioner)
método, pero para DataFrames en lugar de RDD. Veo ahora quepartitionBy
sólo está disponible para Par DDR, no está seguro de por qué es así.Respuestas:
Chispa> = 2.3.0
SPARK-22614 expone la división de rango.
SPARK-22389 expone el particionamiento de formato externo en la fuente de datos API v2 .
Chispa> = 1.6.0
En Spark> = 1.6 es posible utilizar particiones por columna para consultas y almacenamiento en caché. Ver: SPARK-11410 y SPARK-4849 usando el
repartition
método:A diferencia de
RDDs
SparkDataset
(incluidoDataset[Row]
akaDataFrame
) no se puede usar un particionador personalizado por ahora. Por lo general, puede abordar eso creando una columna de partición artificial, pero no le dará la misma flexibilidad.Chispa <1.6.0:
Una cosa que puede hacer es particionar previamente los datos de entrada antes de crear un
DataFrame
Dado que la
DataFrame
creación a partir de unRDD
requiere solo una fase de mapa simple, el diseño de partición existente debe conservarse *:De la misma manera que puede reparticionar existentes
DataFrame
:Entonces parece que no es imposible. La pregunta sigue siendo si tiene sentido. Argumentaré que la mayoría de las veces no lo hace:
Reparticionar es un proceso costoso. En un escenario típico, la mayoría de los datos deben ser serializados, barajados y deserializados. Por otro lado, el número de operaciones que pueden beneficiarse de los datos particionados previamente es relativamente pequeño y se limita aún más si la API interna no está diseñada para aprovechar esta propiedad.
GROUP BY
: es posible reducir la huella de memoria de los búferes temporales **, pero el costo general es mucho mayor. Más o menos equivalente agroupByKey.mapValues(_.reduce)
(comportamiento actual) frente areduceByKey
(prepartición). Es poco probable que sea útil en la práctica.SqlContext.cacheTable
. Como parece que está usando codificación de longitud de ejecución, la aplicaciónOrderedRDDFunctions.repartitionAndSortWithinPartitions
podría mejorar la relación de compresión.El rendimiento depende en gran medida de una distribución de las claves. Si está sesgado, dará como resultado una utilización de recursos subóptima. En el peor de los casos, será imposible terminar el trabajo.
Conceptos relacionados
Particionamiento con fuentes JDBC :
Las fuentes de datos JDBC admiten
predicates
argumentos . Se puede usar de la siguiente manera:Crea una única partición JDBC por predicado. Tenga en cuenta que si los conjuntos creados con predicados individuales no son disjuntos, verá duplicados en la tabla resultante.
partitionBy
método enDataFrameWriter
:Spark
DataFrameWriter
proporciona unpartitionBy
método que puede usarse para "particionar" datos en escritura. Separa los datos en escritura usando el conjunto de columnas provistoEsto permite que el predicado empuje hacia abajo en la lectura de consultas basadas en la clave:
pero no es equivalente a
DataFrame.repartition
. En particular agregaciones como:aún requerirá
TungstenExchange
:bucketBy
Método enDataFrameWriter
(Spark> = 2.0):bucketBy
tiene aplicaciones similarespartitionBy
pero solo está disponible para tablas (saveAsTable
). La información de agrupación se puede utilizar para optimizar las uniones:* Por diseño de partición me refiero solo a una distribución de datos.
partitioned
RDD ya no tiene un particionador. ** Suponiendo que no hay proyección temprana. Si la agregación cubre solo un pequeño subconjunto de columnas, probablemente no haya ganancia alguna.fuente
DataFrameWriter.partitionBy
lógicamente no es lo mismo queDataFrame.repartition
. El anterior no se baraja, simplemente separa la salida. En cuanto a la primera pregunta.- los datos se guardan por partición y no hay barajado. Puede verificarlo fácilmente leyendo archivos individuales. Pero Spark solo no tiene forma de saberlo si esto es lo que realmente quieres.En Spark <1.6 Si crea un
HiveContext
, no el viejoSqlContext
, puede usar HiveQLDISTRIBUTE BY colX...
(se asegura de que cada uno de los N reductores obtenga rangos no superpuestos de x) &CLUSTER BY colX...
(acceso directo para Distribuir por y Ordenar por) por ejemplo;No estoy seguro de cómo encaja esto con la API de Spark DF. Estas palabras clave no son compatibles con el SqlContext normal (tenga en cuenta que no necesita tener una meta tienda de colmena para usar HiveContext)
EDITAR: Spark 1.6+ ahora tiene esto en la API nativa de DataFrame
fuente
Entonces, para comenzar con algún tipo de respuesta:) - No puedes
No soy un experto, pero hasta donde entiendo DataFrames, no son iguales a rdd y DataFrame no tiene tal cosa como Partitioner.
En general, la idea de DataFrame es proporcionar otro nivel de abstracción que maneje tales problemas por sí mismo. Las consultas en DataFrame se traducen en un plan lógico que se traduce aún más en operaciones en RDD. La partición que sugirió probablemente se aplicará automáticamente o al menos debería.
Si no confía en SparkSQL que proporcionará algún tipo de trabajo óptimo, siempre puede transformar DataFrame a RDD [Row] como se sugiere en los comentarios.
fuente
Use el DataFrame devuelto por:
No hay una forma explícita de usar
partitionBy
en un DataFrame, solo en un PairRDD, pero cuando ordena un DataFrame, lo usará en su LogicalPlan y eso ayudará cuando necesite hacer cálculos en cada Cuenta.Me topé con el mismo problema exacto, con un marco de datos que quiero particionar por cuenta. Supongo que cuando dice "desea particionar los datos para que todas las transacciones de una cuenta estén en la misma partición de Spark", lo desea por escala y rendimiento, pero su código no depende de ello (como usar
mapPartitions()
etc), ¿verdad?fuente
Pude hacer esto usando RDD. Pero no sé si esta es una solución aceptable para usted. Una vez que tenga el DF disponible como RDD, puede aplicar
repartitionAndSortWithinPartitions
para realizar un reparto de datos personalizado.Aquí hay una muestra que utilicé:
fuente