Tarea no serializable: java.io.NotSerializableException cuando se llama a la función fuera del cierre solo en clases, no en objetos

224

Obtención de un comportamiento extraño al llamar a la función fuera de un cierre:

  • cuando la función está en un objeto todo funciona
  • cuando la función está en una clase obtener:

Tarea no serializable: java.io.NotSerializableException: prueba

El problema es que necesito mi código en una clase y no en un objeto. ¿Alguna idea de por qué está sucediendo esto? ¿Se serializa un objeto Scala (predeterminado?)?

Este es un ejemplo de código de trabajo:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

Este es el ejemplo que no funciona:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}
Nimrod007
fuente
¿Qué es Spark.ctx? No hay ningún objeto Spark con el método ctx AFAICT
javadba

Respuestas:

334

Los RDD extienden la interfaz Serialable , por lo que esto no es lo que está causando que su tarea falle. Ahora esto no significa que pueda serializar un RDDcon Spark y evitarNotSerializableException

Spark es un motor informático distribuido y su abstracción principal es un conjunto de datos distribuidos ( RDD ) resistente , que puede verse como una colección distribuida. Básicamente, los elementos de RDD se dividen en los nodos del clúster, pero Spark lo abstrae del usuario, permitiendo que el usuario interactúe con el RDD (colección) como si fuera local.

No entrar en demasiados detalles, pero cuando se ejecuta diferentes transformaciones en un RDD ( map, flatMap, filtery otros), el código de transformación (cierre) es:

  1. serializado en el nodo del controlador,
  2. enviado a los nodos apropiados en el clúster,
  3. deserializado,
  4. y finalmente ejecutado en los nodos

Por supuesto, puede ejecutar esto localmente (como en su ejemplo), pero todas esas fases (aparte del envío a través de la red) aún ocurren. [Esto le permite detectar cualquier error incluso antes de implementarlo en producción]

Lo que sucede en su segundo caso es que está llamando a un método, definido en clase testingdesde dentro de la función de mapa. Spark ve eso y dado que los métodos no se pueden serializar por sí mismos, Spark intenta serializar toda la testing clase, de modo que el código seguirá funcionando cuando se ejecute en otra JVM. Tienes dos posibilidades:

Puede hacer que las pruebas de clase sean serializables, por lo que Spark puede serializar toda la clase:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

o haces una someFuncfunción en lugar de un método (las funciones son objetos en Scala), para que Spark pueda serializarlo:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

Un problema similar, pero no el mismo, con la serialización de clases puede ser de su interés y puede leerlo en esta presentación de Spark Summit 2013 .

Como nota al margen, puede volver rddList.map(someFunc(_))a escribir rddList.map(someFunc), son exactamente lo mismo. Por lo general, se prefiere el segundo ya que es menos detallado y más limpio de leer.

EDITAR (2015-03-15): SPARK-5307 introdujo SerializationDebugger y Spark 1.3.0 es la primera versión en usarlo. Agrega la ruta de serialización a una excepción NotSerializableException . Cuando se encuentra una excepción NotSerializableException, el depurador visita el gráfico de objetos para encontrar la ruta hacia el objeto que no se puede serializar, y construye información para ayudar al usuario a encontrar el objeto.

En el caso de OP, esto es lo que se imprime en stdout:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)
Grega Kešpret
fuente
1
Hmm, lo que has explicado ciertamente tiene sentido, y explica por qué toda la clase se serializa (algo que no entendí completamente). Sin embargo, aún sostendré que los rdd no son serializables (bueno, extienden Serializable, pero eso no significa que no causen NotSerializableException, pruébelo). Por eso, si los coloca fuera de las clases, corrige el error. Voy a editar mi respuesta un poco para ser más preciso sobre lo que quiero decir, es decir, causan la excepción, no que extiendan la interfaz.
samthebest
35
En caso de que no tenga control sobre la clase, debe ser serializable ... si está usando Scala, puede crear una instancia con Serializable:val test = new Test with Serializable
Mark S
44
"rddList.map (someFunc (_)) a rddList.map (someFunc), son exactamente lo mismo" No, no son exactamente lo mismo, y de hecho el uso de este último puede causar excepciones de serialización si el primero no lo fuera.
samthebest
1
@samthebest, ¿podría explicar por qué map (someFunc (_)) no causaría excepciones de serialización mientras que map (someFunc) sí?
Alon
31

La respuesta de Grega es excelente para explicar por qué el código original no funciona y dos formas de solucionar el problema. Sin embargo, esta solución no es muy flexible; considere el caso en el que su cierre incluye una llamada al método en una no Serializableclase sobre la que no tiene control. No puede agregar la Serializableetiqueta a esta clase ni cambiar la implementación subyacente para cambiar el método a una función.

Nilesh presenta una gran solución para esto, pero la solución puede ser más concisa y general:

def genMapper[A, B](f: A => B): A => B = {
  val locker = com.twitter.chill.MeatLocker(f)
  x => locker.get.apply(x)
}

Este serializador de funciones se puede utilizar para cerrar automáticamente los cierres y las llamadas a métodos:

rdd map genMapper(someFunc)

Esta técnica también tiene el beneficio de no requerir las dependencias adicionales de Shark para acceder KryoSerializationWrapper, ya que Chill de Twitter ya está atrapado por el núcleo de Spark

Ben Sidhom
fuente
Hola, me pregunto si necesito registrar algo si uso tu código. Intenté y obtuve una excepción de clase Noble find de kryo. THX
G_cy
25

Charla completa que explica completamente el problema, que propone una gran forma de cambio de paradigma para evitar estos problemas de serialización: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- leaks-no-ws.md

La respuesta más votada es básicamente sugerir descartar una función de lenguaje completa, que ya no usa métodos y solo funciones. De hecho, en los métodos de programación funcional en las clases se debe evitar, pero convertirlos en funciones no resuelve el problema del diseño aquí (ver enlace anterior).

Como una solución rápida en esta situación particular, podría usar la @transientanotación para decirle que no intente serializar el valor ofensivo (aquí, Spark.ctxes una clase personalizada, no la de Spark después del nombre de OP):

@transient
val rddList = Spark.ctx.parallelize(list)

También puede reestructurar el código para que rddList viva en otro lugar, pero eso también es desagradable.

El futuro es probablemente esporas

En el futuro, Scala incluirá estas cosas llamadas "esporas" que deberían permitirnos controlar el grano fino de lo que hace y no es atraído exactamente por un cierre. Además, esto debería convertir todos los errores de extraer accidentalmente los tipos no serializables (o cualquier valor no deseado) en errores de compilación en lugar de ahora, lo que son horribles excepciones de tiempo de ejecución / pérdidas de memoria.

http://docs.scala-lang.org/sips/pending/spores.html

Un consejo sobre la serialización de Kryo

Cuando use kyro, asegúrese de que el registro sea necesario, esto significará que obtendrá errores en lugar de pérdidas de memoria:

"Finalmente, sé que kryo tiene kryo.setRegistrationOptional (verdadero) pero me está costando mucho tratar de descubrir cómo usarlo. Cuando esta opción está activada, kryo parece arrojar excepciones si no me he registrado clases ".

Estrategia para registrar clases con kryo

Por supuesto, esto solo le da control de nivel de tipo, no control de nivel de valor.

... más ideas por venir.

samthebest
fuente
9

Resolví este problema usando un enfoque diferente. Simplemente necesita serializar los objetos antes de pasar por el cierre, y des-serializar después. Este enfoque simplemente funciona, incluso si sus clases no son serializables, porque usa Kryo detrás de escena. Todo lo que necesitas es un poco de curry. ;)

Aquí hay un ejemplo de cómo lo hice:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

Siéntase libre de hacer que Blah sea tan complicado como desee, clase, objeto complementario, clases anidadas, referencias a múltiples bibliotecas de terceros.

KryoSerializationWrapper se refiere a: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

Nilesh
fuente
¿Esto realmente serializa la instancia o crea una instancia estática y serializa una referencia (vea mi respuesta)?
samthebest
2
@samthebest, ¿podrías elaborar? Si investigas KryoSerializationWrapper, descubrirás que hace que Spark piense que sí lo es, java.io.Serializablesimplemente serializa el objeto internamente usando Kryo, más rápido y más simple. Y no creo que se trate de una instancia estática, solo deserializa el valor cuando se llama a value.apply ().
Nilesh
8

Me enfrenté a un problema similar, y lo que entiendo de la respuesta de Grega es

object NOTworking extends App {
 new testing().doIT
}
//adding extends Serializable wont help
class testing {

val list = List(1,2,3)

val rddList = Spark.ctx.parallelize(list)

def doIT =  {
  //again calling the fucntion someFunc 
  val after = rddList.map(someFunc(_))
  //this will crash (spark lazy)
  after.collect().map(println(_))
}

def someFunc(a:Int) = a+1

}

su método doIT intenta serializar algún método FunF (_) , pero como el método no es serializable, intenta serializar las pruebas de clase que nuevamente no son serializables.

Así que haga que su código funcione, debe definir someFunc dentro del método doIT . Por ejemplo:

def doIT =  {
 def someFunc(a:Int) = a+1
  //function definition
 }
 val after = rddList.map(someFunc(_))
 after.collect().map(println(_))
}

Y si aparecen varias funciones, entonces todas esas funciones deberían estar disponibles para el contexto principal.

Tarang Bhalodia
fuente
7

No estoy completamente seguro de que esto se aplique a Scala, pero en Java resolví NotSerializableExceptionrefactorizando mi código para que el cierre no tuviera acceso a un finalcampo no serializable .

Trebor grosero
fuente
Estoy enfrentando el mismo problema en Java, estoy tratando de usar la clase FileWriter del paquete Java IO dentro del método RDD foreach. ¿Me puede decir cómo podemos resolver esto?
Shankar
1
Bueno @Shankar, si FileWriteres un finalcampo de la clase externa, no puedes hacerlo. Pero FileWriterse puede construir a partir de a Stringo a File, los cuales son Serializable. Así que refactorice su código para construir un local FileWriterbasado en el nombre de archivo de la clase externa.
Trebor Rude
0

Para su información en Spark 2.4, muchos de ustedes probablemente encontrarán este problema. La serialización de Kryo ha mejorado, pero en muchos casos no puede usar spark.kryo.unsafe = true o el ingenuo serializador de kryo.

Para una solución rápida, intente cambiar lo siguiente en su configuración de Spark

spark.kryo.unsafe="false"

O

spark.serializer="org.apache.spark.serializer.JavaSerializer"

Modifico las transformaciones RDD personalizadas que encuentro o escribo personalmente utilizando variables de transmisión explícitas y utilizando la nueva API incorporada de twitter-chill, convirtiéndolas rdd.map(row =>en rdd.mapPartitions(partition => {funciones.

Ejemplo

Manera antigua (no genial)

val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => {
    val value = sampleMap.get(row._1)
    value
})

Manera alternativa (mejor)

import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))

rdd.mapPartitions(partition => {
    val deSerSampleMap = brdSerSampleMap.value.get
    partition.map(row => {
        val value = sampleMap.get(row._1)
        value
    }).toIterator
})

Esta nueva forma solo llamará a la variable de difusión una vez por partición, lo cual es mejor. Aún necesitará utilizar la serialización de Java si no registra las clases.

Iglesia Gabe
fuente