Spark: UDF ejecutado muchas veces

9

Tengo un marco de datos con el siguiente código:

def test(lat: Double, lon: Double) = {
  println(s"testing ${lat / lon}")
  Map("one" -> "one", "two" -> "two")
}

val testUDF = udf(test _)

df.withColumn("test", testUDF(col("lat"), col("lon")))
  .withColumn("test1", col("test.one"))
  .withColumn("test2", col("test.two"))

Ahora comprobando los registros, descubrí que para cada fila el UDF se ejecuta 3 veces. Si agrego el "test3" de una columna "test.three", el UDF se ejecuta una vez más.

¿Alguien puede explicarme por qué?

¿Se puede evitar esto correctamente (sin almacenar en caché el marco de datos después de agregar "prueba", incluso si esto funciona)?

Rolintocour
fuente
¿Qué quieres decir? Estás llamando a la función de prueba tres veces. Por eso se está ejecutando tres veces. No estoy seguro de por qué lo estás haciendo un UDF. ¿Por qué no simplemente hacer que el mapa sea val?
user4601931
Este es solo un ejemplo para mostrar el comportamiento de la chispa. Para mí "prueba" es una nueva columna que contiene una estructura, luego acceder a cualquier parte de la estructura no debería ejecutar el UDF nuevamente. Como me equivoco
Rolintocour
Traté de imprimir el esquema, el DataType de "prueba" es Mapy no un Struct. Ahora, en lugar de devolver un Mapa, si la UDF devuelve una clase de caso como Test (una Cadena, dos: Cadena), entonces testes una Estructura pero siempre hay tantas ejecuciones de la UDF.
Rolintocour el
relacionado: stackoverflow.com/questions/40320563/…
Raphael Roth
el almacenamiento en caché debería funcionar de acuerdo con esta respuesta: stackoverflow.com/a/40962714/1138523
Raphael Roth el

Respuestas:

5

Si desea evitar múltiples llamadas a un udf (lo cual es útil especialmente si el udf es un cuello de botella en su trabajo) puede hacerlo de la siguiente manera:

val testUDF = udf(test _).asNondeterministic()

Básicamente le dice a Spark que su función no es determinista y ahora Spark se asegura de que se llame solo una vez porque no es seguro llamarla varias veces (cada llamada podría devolver un resultado diferente).

También tenga en cuenta que este truco no es gratis, al hacer esto está poniendo algunas restricciones en el optimizador, un efecto secundario de esto es, por ejemplo, que el optimizador de chispa no empuja los filtros a través de expresiones que no son deterministas, por lo que se hace responsable del óptimo posición de los filtros en su consulta.

David Vrba
fuente
¡bonito! esta respuesta también pertenece aquí: stackoverflow.com/questions/40320563/…
Raphael Roth
En mi caso, asNondeterministicobliga al UDF a ejecutarse solo una vez. Con la explode(array(myUdf($"id")))solución, todavía se ejecuta dos veces.
Rolintocour