Actualizar
Esta respuesta sigue siendo válida e informativo, aunque las cosas están ahora mejor desde 2.2 / 2.3, que añade soporte integrado para el codificador Set
, Seq
, Map
, Date
, Timestamp
, y BigDecimal
. Si te limitas a crear tipos con solo clases de casos y los tipos habituales de Scala, deberías estar bien solo con lo implícito SQLImplicits
.
Desafortunadamente, prácticamente no se ha agregado nada para ayudar con esto. Buscar @since 2.0.0
en Encoders.scala
o SQLImplicits.scala
encuentra cosas principalmente relacionadas con tipos primitivos (y algunos ajustes de clases de casos). Entonces, lo primero que hay que decir: actualmente no hay un soporte realmente bueno para codificadores de clase personalizados . Con eso fuera del camino, lo que sigue son algunos trucos que hacen un trabajo tan bueno como podemos esperar, dado lo que tenemos actualmente a nuestra disposición. Como descargo de responsabilidad por adelantado: esto no funcionará perfectamente y haré todo lo posible para que todas las limitaciones sean claras y directas.
Cuál es exactamente el problema
Cuando desee crear un conjunto de datos, Spark "requiere un codificador (para convertir un objeto JVM de tipo T ay desde la representación interna de Spark SQL) que generalmente se crea automáticamente mediante implicits desde a SparkSession
, o puede crearse explícitamente llamando a métodos estáticos on Encoders
"(tomado de los documentos encreateDataset
). Un codificador tomará la forma Encoder[T]
donde T
está el tipo que está codificando. La primera sugerencia es agregar import spark.implicits._
(que le proporciona estos codificadores implícitos) y la segunda sugerencia es pasar explícitamente el codificador implícito utilizando este conjunto de funciones relacionadas con el codificador.
No hay codificador disponible para clases regulares, por lo que
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
le dará el siguiente error de tiempo de compilación relacionado implícito:
No se puede encontrar el codificador para el tipo almacenado en un conjunto de datos. Los tipos primitivos (Int, String, etc.) y los tipos de productos (clases de caso) son compatibles con la importación de sqlContext.implicits. En futuras versiones se agregará soporte para serializar otros tipos.
Sin embargo, si ajusta el tipo que acaba de usar para obtener el error anterior en alguna clase que se extiende Product
, el error se retrasa confusamente en tiempo de ejecución, por lo que
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
Compila bien, pero falla en tiempo de ejecución con
java.lang.UnsupportedOperationException: no se ha encontrado ningún codificador para MyObj
La razón de esto es que los codificadores que Spark crea con los implicits en realidad solo se hacen en tiempo de ejecución (a través de la reescalación de escala). En este caso, todas las comprobaciones de Spark en tiempo de compilación es que la clase más externa se extiende Product
(lo que hacen todas las clases de casos), y solo se da cuenta en el tiempo de ejecución de que todavía no sabe qué hacer MyObj
(el mismo problema ocurre si intento hacer a Dataset[(Int,MyObj)]
- Spark espera hasta el tiempo de ejecución para vomitar MyObj
). Estos son problemas centrales que necesitan urgentemente ser solucionados:
- algunas clases que amplían la
Product
compilación a pesar de fallar siempre en tiempo de ejecución y
- no hay forma de pasar codificadores personalizados para tipos anidados (no tengo forma de alimentar a Spark con un codificador solo para
MyObj
que sepa cómo codificar Wrap[MyObj]
o (Int,MyObj)
).
Solo usa kryo
La solución que todos sugieren es usar el kryo
codificador.
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
Sin embargo, esto se vuelve bastante tedioso rápidamente. Especialmente si su código está manipulando todo tipo de conjuntos de datos, uniéndose, agrupando, etc. Usted termina acumulando un montón de problemas adicionales. Entonces, ¿por qué no hacer un implícito que hace todo esto automáticamente?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
Y ahora, parece que puedo hacer casi cualquier cosa que quiera (el siguiente ejemplo no funcionará en el lugar spark-shell
donde spark.implicits._
se importa automáticamente)
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
O casi El problema es que el uso de kryo
leads lleva a Spark a almacenar cada fila del conjunto de datos como un objeto binario plano. Para map
, filter
, foreach
que es suficiente, pero para operaciones como join
, Spark realmente las necesita para ser separados en columnas. Al inspeccionar el esquema para d2
o d3
, verá que solo hay una columna binaria:
d2.printSchema
// root
// |-- value: binary (nullable = true)
Solución parcial para tuplas.
Entonces, usando la magia de las implicidades en Scala (más en 6.26.3 Resolución de sobrecarga ), puedo hacer una serie de implicidades que harán el mejor trabajo posible, al menos para las tuplas, y funcionarán bien con las implicidades existentes:
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._ // we can still take advantage of all the old implicits
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
// ... you can keep making these
Luego, armado con estas dificultades, puedo hacer que mi ejemplo anterior funcione, aunque con un cambio de nombre de columna
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
Todavía no he encontrado la manera de obtener los nombres de tupla esperados ( _1
, _2
, ...) de forma predeterminada sin cambiar el nombre de ellos - si alguien quiere jugar un poco con esto, este es el lugar donde el nombre "value"
se introduce y esto es donde la tupla generalmente se agregan nombres. Sin embargo, el punto clave es que ahora tengo un buen esquema estructurado:
d4.printSchema
// root
// |-- _1: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
// |-- _2: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
Entonces, en resumen, esta solución alternativa:
- nos permite obtener columnas separadas para las tuplas (para que podamos unirnos nuevamente en las tuplas, ¡sí!)
- nuevamente podemos confiar en las implicidades (así que no hay necesidad de pasar por
kryo
todas partes)
- es casi totalmente compatible con versiones anteriores
import spark.implicits._
(con algunos cambios de nombre)
- no no vamos a unir en las
kyro
columnas binarios serializados, y mucho menos en los campos de los que pueden tener
- tiene el efecto secundario desagradable de cambiar el nombre de algunas de las columnas de tupla a "valor" (si es necesario, esto se puede deshacer mediante la conversión
.toDF
, la especificación de nuevos nombres de columna y la conversión de nuevo a un conjunto de datos, y los nombres de esquema parecen conservarse mediante combinaciones) , donde más se necesitan).
Solución parcial para clases en general.
Este es menos agradable y no tiene una buena solución. Sin embargo, ahora que tenemos la solución de tupla anterior, tengo el presentimiento de que la solución de conversión implícita de otra respuesta también será un poco menos dolorosa ya que puede convertir sus clases más complejas en tuplas. Luego, después de crear el conjunto de datos, probablemente cambie el nombre de las columnas utilizando el enfoque de marco de datos. Si todo va bien, esto es realmente una mejora ya que ahora puedo realizar uniones en los campos de mis clases. Si hubiera utilizado un kryo
serializador binario plano, eso no hubiera sido posible.
Aquí está un ejemplo que hace un poco de todo: Tengo una clase MyObj
que tiene campos de tipos Int
, java.util.UUID
y Set[String]
. El primero se cuida solo. El segundo, aunque podría serializar usando kryo
, sería más útil si se almacena como un String
(ya que los UUID
s suelen ser algo contra lo que me quiero unir). El tercero realmente solo pertenece a una columna binaria.
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])
// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
Ahora, puedo crear un conjunto de datos con un buen esquema usando esta maquinaria:
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
Y el esquema me muestra columnas I con los nombres correctos y con las dos primeras cosas con las que me puedo unir.
d.printSchema
// root
// |-- i: integer (nullable = false)
// |-- u: string (nullable = true)
// |-- s: binary (nullable = true)
ExpressionEncoder
usando la serialización JSON? En mi caso no puedo salir con tuplas, y Kryo me da una columna binaria ..Usando codificadores genéricos.
Hay dos codificadores genéricos disponibles por ahora
kryo
yjavaSerialization
donde el último se describe explícitamente como:Asumiendo la siguiente clase
Puede usar estos codificadores agregando un codificador implícito:
que se pueden usar juntos de la siguiente manera:
Almacena objetos como
binary
columna, por lo que cuando se convierteDataFrame
usted obtiene el siguiente esquema:También es posible codificar tuplas usando
kryo
codificador para un campo específico:Tenga en cuenta que no dependemos de codificadores implícitos aquí, sino que pasamos el codificador explícitamente, por lo que lo más probable es que esto no funcione con el
toDS
método.Usando conversiones implícitas:
Proporcione conversiones implícitas entre la representación que se puede codificar y la clase personalizada, por ejemplo:
Preguntas relacionadas:
fuente
Set
) que obtengoException in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Set[Bar]
.kryo[Set[Bar]]
. De la misma manera, si la clase contiene un campoBar
, necesita un codificador para un objeto completo. Estos son métodos muy crudos.Bar
, necesita un codificador para un objeto completo". mi pregunta era cómo codificar este "proyecto completo"?Puede usar UDTRegistration y luego Case Classes, Tuples, etc. ¡todo funciona correctamente con su tipo definido por el usuario!
Digamos que quieres usar una enumeración personalizada:
Regístralo así:
Entonces úsalo!
Digamos que quieres usar un registro polimórfico:
... y lo usamos así:
Puede escribir un UDT personalizado que codifique todo en bytes (estoy usando la serialización de Java aquí, pero probablemente sea mejor instrumentar el contexto Kryo de Spark).
Primero defina la clase UDT:
Entonces regístralo:
¡Entonces puedes usarlo!
fuente
Los codificadores funcionan más o menos de la misma manera
Spark2.0
. YKryo
sigue siendo laserialization
opción recomendada .Puedes ver el siguiente ejemplo con spark-shell
Hasta ahora] no había
appropriate encoders
alcance actual, por lo que nuestras personas no estaban codificadas comobinary
valores. Pero eso cambiará una vez que proporcionemos algunosimplicit
codificadores utilizando laKryo
serialización.fuente
En el caso de la clase Java Bean, esto puede ser útil
Ahora puede simplemente leer el DataFrame como DataFrame personalizado
Esto creará un codificador de clase personalizado y no uno binario.
fuente
Mis ejemplos estarán en Java, pero no creo que sea difícil adaptarse a Scala.
He tenido bastante éxito la conversión
RDD<Fruit>
aDataset<Fruit>
utilizar spark.createDataset y Encoders.bean el tiempo queFruit
es un simple Java Bean .Paso 1: Crea el Java Bean simple.
Me apegaría a las clases con tipos primitivos y String como campos antes de que la gente de DataBricks refuerce sus codificadores. Si tiene una clase con objeto anidado, cree otro Java Bean simple con todos sus campos aplanados, para que pueda usar transformaciones RDD para asignar el tipo complejo al más simple.Claro que es un poco de trabajo extra, pero imagino que ayudará mucho en el rendimiento al trabajar con un esquema plano.
Paso 2: Obtenga su conjunto de datos del RDD
¡Y voilá! Enjabonar, enjuagar, repetir.
fuente
Para aquellos que puedan en mi situación, pongo mi respuesta aquí también.
Ser especifico,
Estaba leyendo 'Establecer datos escritos' de SQLContext. Entonces el formato de datos original es DataFrame.
val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()
+---+---+ | a| b| +---+---+ | 1|[1]| +---+---+
Luego conviértalo a RDD usando rdd.map () con el tipo mutable.WrappedArray.
sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)
Resultado:
(1,Set(1))
fuente
Además de las sugerencias ya dadas, otra opción que descubrí recientemente es que puede declarar su clase personalizada, incluido el rasgo
org.apache.spark.sql.catalyst.DefinedByConstructorParams
.Esto funciona si la clase tiene un constructor que usa tipos que el ExpressionEncoder puede entender, es decir, valores primitivos y colecciones estándar. Puede ser útil cuando no puede declarar la clase como una clase de caso, pero no quiere usar Kryo para codificarla cada vez que se incluye en un conjunto de datos.
Por ejemplo, quería declarar una clase de caso que incluyera un vector Breeze. El único codificador que podría manejar eso normalmente sería Kryo. Pero si declaraba una subclase que extendía Breeze DenseVector y DefinedByConstructorParams, ExpressionEncoder entendía que podía serializarse como una matriz de Dobles.
Así es como lo declaró:
Ahora puedo usarlo
SerializableDenseVector
en un conjunto de datos (directamente o como parte de un producto) usando un ExpressionEncoder simple y sin Kryo. Funciona igual que un Breeze DenseVector pero se serializa como una matriz [doble].fuente