Tengo un marco de datos con el siguiente código:
def test(lat: Double, lon: Double) = {
println(s"testing ${lat / lon}")
Map("one" -> "one", "two" -> "two")
}
val testUDF = udf(test _)
df.withColumn("test", testUDF(col("lat"), col("lon")))
.withColumn("test1", col("test.one"))
.withColumn("test2", col("test.two"))
Ahora comprobando los registros, descubrí que para cada fila el UDF se ejecuta 3 veces. Si agrego el "test3" de una columna "test.three", el UDF se ejecuta una vez más.
¿Alguien puede explicarme por qué?
¿Se puede evitar esto correctamente (sin almacenar en caché el marco de datos después de agregar "prueba", incluso si esto funciona)?
scala
apache-spark
apache-spark-sql
Rolintocour
fuente
fuente
Map
y no un Struct. Ahora, en lugar de devolver un Mapa, si la UDF devuelve una clase de caso como Test (una Cadena, dos: Cadena), entoncestest
es una Estructura pero siempre hay tantas ejecuciones de la UDF.Respuestas:
Si desea evitar múltiples llamadas a un udf (lo cual es útil especialmente si el udf es un cuello de botella en su trabajo) puede hacerlo de la siguiente manera:
Básicamente le dice a Spark que su función no es determinista y ahora Spark se asegura de que se llame solo una vez porque no es seguro llamarla varias veces (cada llamada podría devolver un resultado diferente).
También tenga en cuenta que este truco no es gratis, al hacer esto está poniendo algunas restricciones en el optimizador, un efecto secundario de esto es, por ejemplo, que el optimizador de chispa no empuja los filtros a través de expresiones que no son deterministas, por lo que se hace responsable del óptimo posición de los filtros en su consulta.
fuente
asNondeterministic
obliga al UDF a ejecutarse solo una vez. Con laexplode(array(myUdf($"id")))
solución, todavía se ejecuta dos veces.