¿Cómo definir la partición de DataFrame?

128

Empecé a usar Spark SQL y DataFrames en Spark 1.4.0. Quiero definir un particionador personalizado en DataFrames, en Scala, pero no veo cómo hacerlo.

Una de las tablas de datos con las que estoy trabajando contiene una lista de transacciones, por cuenta, silimar para el siguiente ejemplo.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

Al menos inicialmente, la mayoría de los cálculos ocurrirán entre las transacciones dentro de una cuenta. Por lo tanto, me gustaría tener los datos particionados para que todas las transacciones de una cuenta estén en la misma partición Spark.

Pero no veo una manera de definir esto. La clase DataFrame tiene un método llamado 'repartición (Int)', donde puede especificar el número de particiones para crear. Pero no veo ningún método disponible para definir un particionador personalizado para un DataFrame, como puede especificarse para un RDD.

Los datos de origen se almacenan en Parquet. Vi que al escribir un DataFrame en Parquet, puede especificar una columna para particionar, por lo que presumiblemente podría decirle a Parquet que particione sus datos en la columna 'Cuenta'. Pero podría haber millones de cuentas, y si entiendo Parquet correctamente, crearía un directorio distinto para cada Cuenta, por lo que no parecía una solución razonable.

¿Hay alguna manera de hacer que Spark particione este DataFrame para que todos los datos de una Cuenta estén en la misma partición?

rastrillo
fuente
consulte este enlace stackoverflow.com/questions/23127329/…
Abhishek Choudhary
Si puede decirle a Parquet que particione por cuenta, probablemente pueda particionar int(account/someInteger)y así obtener un número razonable de cuentas por directorio.
Paul
1
@ABC: vi ese enlace. Estaba buscando el equivalente de ese partitionBy(Partitioner)método, pero para DataFrames en lugar de RDD. Veo ahora que partitionBysólo está disponible para Par DDR, no está seguro de por qué es así.
rastrillo
@Paul: consideré hacer lo que tú describes. Algunas cosas me detuvieron:
rastrillo
continuando .... (1) Eso es para "Partición de parquet". No pude encontrar ningún documento que indique que la partición Spark realmente usará la partición Parquet. (2) Si entiendo los documentos de Parquet, necesito definir un nuevo campo "foo", entonces cada directorio de Parquet tendría un nombre como "foo = 123". Pero si construyo una consulta que involucra AccountID , ¿cómo Spark / hive / parquet sabría que hay algún vínculo entre foo y AccountID ?
rastrillo

Respuestas:

177

Chispa> = 2.3.0

SPARK-22614 expone la división de rango.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389 expone el particionamiento de formato externo en la fuente de datos API v2 .

Chispa> = 1.6.0

En Spark> = 1.6 es posible utilizar particiones por columna para consultas y almacenamiento en caché. Ver: SPARK-11410 y SPARK-4849 usando el repartitionmétodo:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

A diferencia de RDDsSpark Dataset(incluido Dataset[Row]aka DataFrame) no se puede usar un particionador personalizado por ahora. Por lo general, puede abordar eso creando una columna de partición artificial, pero no le dará la misma flexibilidad.

Chispa <1.6.0:

Una cosa que puede hacer es particionar previamente los datos de entrada antes de crear un DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

Dado que la DataFramecreación a partir de un RDDrequiere solo una fase de mapa simple, el diseño de partición existente debe conservarse *:

assert(df.rdd.partitions == partitioned.partitions)

De la misma manera que puede reparticionar existentes DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

Entonces parece que no es imposible. La pregunta sigue siendo si tiene sentido. Argumentaré que la mayoría de las veces no lo hace:

  1. Reparticionar es un proceso costoso. En un escenario típico, la mayoría de los datos deben ser serializados, barajados y deserializados. Por otro lado, el número de operaciones que pueden beneficiarse de los datos particionados previamente es relativamente pequeño y se limita aún más si la API interna no está diseñada para aprovechar esta propiedad.

    • se une en algunos escenarios, pero requeriría un soporte interno,
    • funciones de ventana llamadas con particionador coincidente. Igual que el anterior, limitado a una sola definición de ventana. Sin embargo, ya está particionado internamente, por lo que la partición previa puede ser redundante,
    • agregaciones simples con GROUP BY: es posible reducir la huella de memoria de los búferes temporales **, pero el costo general es mucho mayor. Más o menos equivalente a groupByKey.mapValues(_.reduce)(comportamiento actual) frente a reduceByKey(prepartición). Es poco probable que sea útil en la práctica.
    • compresión de datos con SqlContext.cacheTable. Como parece que está usando codificación de longitud de ejecución, la aplicación OrderedRDDFunctions.repartitionAndSortWithinPartitionspodría mejorar la relación de compresión.
  2. El rendimiento depende en gran medida de una distribución de las claves. Si está sesgado, dará como resultado una utilización de recursos subóptima. En el peor de los casos, será imposible terminar el trabajo.

  3. Un punto de usar una API declarativa de alto nivel es aislarse de los detalles de implementación de bajo nivel. Como ya lo mencionaron @dwysakowicz y @RomiKuntsman, la optimización es un trabajo del Optimizador Catalyst . Es una bestia bastante sofisticada y realmente dudo que puedas mejorarla fácilmente sin sumergirte mucho más en sus partes internas.

Conceptos relacionados

Particionamiento con fuentes JDBC :

Las fuentes de datos JDBC admiten predicatesargumentos . Se puede usar de la siguiente manera:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

Crea una única partición JDBC por predicado. Tenga en cuenta que si los conjuntos creados con predicados individuales no son disjuntos, verá duplicados en la tabla resultante.

partitionBymétodo enDataFrameWriter :

Spark DataFrameWriterproporciona un partitionBymétodo que puede usarse para "particionar" datos en escritura. Separa los datos en escritura usando el conjunto de columnas provisto

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

Esto permite que el predicado empuje hacia abajo en la lectura de consultas basadas en la clave:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

pero no es equivalente a DataFrame.repartition. En particular agregaciones como:

val cnts = df1.groupBy($"k").sum()

aún requerirá TungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketByMétodo enDataFrameWriter (Spark> = 2.0):

bucketBytiene aplicaciones similares partitionBypero solo está disponible para tablas ( saveAsTable). La información de agrupación se puede utilizar para optimizar las uniones:

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* Por diseño de partición me refiero solo a una distribución de datos. partitionedRDD ya no tiene un particionador. ** Suponiendo que no hay proyección temprana. Si la agregación cubre solo un pequeño subconjunto de columnas, probablemente no haya ganancia alguna.

cero323
fuente
@por casualidad Sí y no. El diseño de los datos se conservará, pero AFAIK no le dará beneficios como la poda de particiones.
zero323
@ zero323 Gracias, ¿hay alguna forma de verificar la asignación de particiones del archivo de parquet para validar df.save.write y guardar el diseño? Y si hago df.repartition ("A"), entonces hago df.write.repartitionBy ("B"), la estructura de carpetas físicas estará dividida por B, y dentro de cada carpeta de valor B, seguirá manteniendo la partición por ¿UNA?
quizás
2
@bychance DataFrameWriter.partitionBylógicamente no es lo mismo que DataFrame.repartition. El anterior no se baraja, simplemente separa la salida. En cuanto a la primera pregunta.- los datos se guardan por partición y no hay barajado. Puede verificarlo fácilmente leyendo archivos individuales. Pero Spark solo no tiene forma de saberlo si esto es lo que realmente quieres.
zero323
11

En Spark <1.6 Si crea un HiveContext, no el viejo SqlContext, puede usar HiveQL DISTRIBUTE BY colX... (se asegura de que cada uno de los N reductores obtenga rangos no superpuestos de x) & CLUSTER BY colX...(acceso directo para Distribuir por y Ordenar por) por ejemplo;

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

No estoy seguro de cómo encaja esto con la API de Spark DF. Estas palabras clave no son compatibles con el SqlContext normal (tenga en cuenta que no necesita tener una meta tienda de colmena para usar HiveContext)

EDITAR: Spark 1.6+ ahora tiene esto en la API nativa de DataFrame

Lobo de noche
fuente
1
¿Se conservan las particiones a medida que se guarda el marco de datos?
Sim
¿Cómo controlas cuántas particiones puedes tener en el ejemplo ql de la colmena? Por ejemplo, en el enfoque de RDD de par, puede hacer esto para crear 5 particiones: particionador val = nuevo HashPartitioner (5)
Minnie
ok, respuesta encontrada, se puede hacer de esta manera: sqlContext.setConf ("spark.sql.shuffle.partitions", "5") No pude editar el comentario anterior ya que perdí el límite de 5 minutos
Minnie
7

Entonces, para comenzar con algún tipo de respuesta:) - No puedes

No soy un experto, pero hasta donde entiendo DataFrames, no son iguales a rdd y DataFrame no tiene tal cosa como Partitioner.

En general, la idea de DataFrame es proporcionar otro nivel de abstracción que maneje tales problemas por sí mismo. Las consultas en DataFrame se traducen en un plan lógico que se traduce aún más en operaciones en RDD. La partición que sugirió probablemente se aplicará automáticamente o al menos debería.

Si no confía en SparkSQL que proporcionará algún tipo de trabajo óptimo, siempre puede transformar DataFrame a RDD [Row] como se sugiere en los comentarios.

Dawid Wysakowicz
fuente
7

Use el DataFrame devuelto por:

yourDF.orderBy(account)

No hay una forma explícita de usar partitionByen un DataFrame, solo en un PairRDD, pero cuando ordena un DataFrame, lo usará en su LogicalPlan y eso ayudará cuando necesite hacer cálculos en cada Cuenta.

Me topé con el mismo problema exacto, con un marco de datos que quiero particionar por cuenta. Supongo que cuando dice "desea particionar los datos para que todas las transacciones de una cuenta estén en la misma partición de Spark", lo desea por escala y rendimiento, pero su código no depende de ello (como usar mapPartitions()etc), ¿verdad?

Romi Kuntsman
fuente
3
¿Qué pasa si su código depende de él porque está usando mapPartitions?
NightWolf
2
Puede convertir el DataFrame en un RDD, y luego Particionarlo (por ejemplo, usando aggregatByKey () y pasar un Partitioner personalizado)
Romi Kuntsman
5

Pude hacer esto usando RDD. Pero no sé si esta es una solución aceptable para usted. Una vez que tenga el DF disponible como RDD, puede aplicar repartitionAndSortWithinPartitionspara realizar un reparto de datos personalizado.

Aquí hay una muestra que utilicé:

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)
Desarrollador
fuente