Obtención de un comportamiento extraño al llamar a la función fuera de un cierre:
- cuando la función está en un objeto todo funciona
- cuando la función está en una clase obtener:
Tarea no serializable: java.io.NotSerializableException: prueba
El problema es que necesito mi código en una clase y no en un objeto. ¿Alguna idea de por qué está sucediendo esto? ¿Se serializa un objeto Scala (predeterminado?)?
Este es un ejemplo de código de trabajo:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
Este es el ejemplo que no funciona:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
scala
serialization
apache-spark
typesafe
Nimrod007
fuente
fuente
Respuestas:
Los RDD extienden la interfaz Serialable , por lo que esto no es lo que está causando que su tarea falle. Ahora esto no significa que pueda serializar un
RDD
con Spark y evitarNotSerializableException
Spark es un motor informático distribuido y su abstracción principal es un conjunto de datos distribuidos ( RDD ) resistente , que puede verse como una colección distribuida. Básicamente, los elementos de RDD se dividen en los nodos del clúster, pero Spark lo abstrae del usuario, permitiendo que el usuario interactúe con el RDD (colección) como si fuera local.
No entrar en demasiados detalles, pero cuando se ejecuta diferentes transformaciones en un RDD (
map
,flatMap
,filter
y otros), el código de transformación (cierre) es:Por supuesto, puede ejecutar esto localmente (como en su ejemplo), pero todas esas fases (aparte del envío a través de la red) aún ocurren. [Esto le permite detectar cualquier error incluso antes de implementarlo en producción]
Lo que sucede en su segundo caso es que está llamando a un método, definido en clase
testing
desde dentro de la función de mapa. Spark ve eso y dado que los métodos no se pueden serializar por sí mismos, Spark intenta serializar toda latesting
clase, de modo que el código seguirá funcionando cuando se ejecute en otra JVM. Tienes dos posibilidades:Puede hacer que las pruebas de clase sean serializables, por lo que Spark puede serializar toda la clase:
o haces una
someFunc
función en lugar de un método (las funciones son objetos en Scala), para que Spark pueda serializarlo:Un problema similar, pero no el mismo, con la serialización de clases puede ser de su interés y puede leerlo en esta presentación de Spark Summit 2013 .
Como nota al margen, puede volver
rddList.map(someFunc(_))
a escribirrddList.map(someFunc)
, son exactamente lo mismo. Por lo general, se prefiere el segundo ya que es menos detallado y más limpio de leer.EDITAR (2015-03-15): SPARK-5307 introdujo SerializationDebugger y Spark 1.3.0 es la primera versión en usarlo. Agrega la ruta de serialización a una excepción NotSerializableException . Cuando se encuentra una excepción NotSerializableException, el depurador visita el gráfico de objetos para encontrar la ruta hacia el objeto que no se puede serializar, y construye información para ayudar al usuario a encontrar el objeto.
En el caso de OP, esto es lo que se imprime en stdout:
fuente
val test = new Test with Serializable
La respuesta de Grega es excelente para explicar por qué el código original no funciona y dos formas de solucionar el problema. Sin embargo, esta solución no es muy flexible; considere el caso en el que su cierre incluye una llamada al método en una no
Serializable
clase sobre la que no tiene control. No puede agregar laSerializable
etiqueta a esta clase ni cambiar la implementación subyacente para cambiar el método a una función.Nilesh presenta una gran solución para esto, pero la solución puede ser más concisa y general:
Este serializador de funciones se puede utilizar para cerrar automáticamente los cierres y las llamadas a métodos:
Esta técnica también tiene el beneficio de no requerir las dependencias adicionales de Shark para acceder
KryoSerializationWrapper
, ya que Chill de Twitter ya está atrapado por el núcleo de Sparkfuente
Charla completa que explica completamente el problema, que propone una gran forma de cambio de paradigma para evitar estos problemas de serialización: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- leaks-no-ws.md
La respuesta más votada es básicamente sugerir descartar una función de lenguaje completa, que ya no usa métodos y solo funciones. De hecho, en los métodos de programación funcional en las clases se debe evitar, pero convertirlos en funciones no resuelve el problema del diseño aquí (ver enlace anterior).
Como una solución rápida en esta situación particular, podría usar la
@transient
anotación para decirle que no intente serializar el valor ofensivo (aquí,Spark.ctx
es una clase personalizada, no la de Spark después del nombre de OP):También puede reestructurar el código para que rddList viva en otro lugar, pero eso también es desagradable.
El futuro es probablemente esporas
En el futuro, Scala incluirá estas cosas llamadas "esporas" que deberían permitirnos controlar el grano fino de lo que hace y no es atraído exactamente por un cierre. Además, esto debería convertir todos los errores de extraer accidentalmente los tipos no serializables (o cualquier valor no deseado) en errores de compilación en lugar de ahora, lo que son horribles excepciones de tiempo de ejecución / pérdidas de memoria.
http://docs.scala-lang.org/sips/pending/spores.html
Un consejo sobre la serialización de Kryo
Cuando use kyro, asegúrese de que el registro sea necesario, esto significará que obtendrá errores en lugar de pérdidas de memoria:
"Finalmente, sé que kryo tiene kryo.setRegistrationOptional (verdadero) pero me está costando mucho tratar de descubrir cómo usarlo. Cuando esta opción está activada, kryo parece arrojar excepciones si no me he registrado clases ".
Estrategia para registrar clases con kryo
Por supuesto, esto solo le da control de nivel de tipo, no control de nivel de valor.
... más ideas por venir.
fuente
Resolví este problema usando un enfoque diferente. Simplemente necesita serializar los objetos antes de pasar por el cierre, y des-serializar después. Este enfoque simplemente funciona, incluso si sus clases no son serializables, porque usa Kryo detrás de escena. Todo lo que necesitas es un poco de curry. ;)
Aquí hay un ejemplo de cómo lo hice:
Siéntase libre de hacer que Blah sea tan complicado como desee, clase, objeto complementario, clases anidadas, referencias a múltiples bibliotecas de terceros.
KryoSerializationWrapper se refiere a: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
fuente
KryoSerializationWrapper
, descubrirás que hace que Spark piense que sí lo es,java.io.Serializable
simplemente serializa el objeto internamente usando Kryo, más rápido y más simple. Y no creo que se trate de una instancia estática, solo deserializa el valor cuando se llama a value.apply ().Me enfrenté a un problema similar, y lo que entiendo de la respuesta de Grega es
su método doIT intenta serializar algún método FunF (_) , pero como el método no es serializable, intenta serializar las pruebas de clase que nuevamente no son serializables.
Así que haga que su código funcione, debe definir someFunc dentro del método doIT . Por ejemplo:
Y si aparecen varias funciones, entonces todas esas funciones deberían estar disponibles para el contexto principal.
fuente
No estoy completamente seguro de que esto se aplique a Scala, pero en Java resolví
NotSerializableException
refactorizando mi código para que el cierre no tuviera acceso a unfinal
campo no serializable .fuente
FileWriter
es unfinal
campo de la clase externa, no puedes hacerlo. PeroFileWriter
se puede construir a partir de aString
o aFile
, los cuales sonSerializable
. Así que refactorice su código para construir un localFileWriter
basado en el nombre de archivo de la clase externa.Para su información en Spark 2.4, muchos de ustedes probablemente encontrarán este problema. La serialización de Kryo ha mejorado, pero en muchos casos no puede usar spark.kryo.unsafe = true o el ingenuo serializador de kryo.
Para una solución rápida, intente cambiar lo siguiente en su configuración de Spark
O
Modifico las transformaciones RDD personalizadas que encuentro o escribo personalmente utilizando variables de transmisión explícitas y utilizando la nueva API incorporada de twitter-chill, convirtiéndolas
rdd.map(row =>
enrdd.mapPartitions(partition => {
funciones.Ejemplo
Manera antigua (no genial)
Manera alternativa (mejor)
Esta nueva forma solo llamará a la variable de difusión una vez por partición, lo cual es mejor. Aún necesitará utilizar la serialización de Java si no registra las clases.
fuente