¿Cómo comprobar si el marco de datos de Spark está vacío?

101

En este momento, tengo que usar df.count > 0para verificar si DataFrameestá vacío o no. Pero es algo ineficiente. ¿Hay alguna forma mejor de hacerlo?

Gracias.

PD: quiero verificar si está vacío para que solo guarde el DataFramesi no está vacío

auxdx
fuente

Respuestas:

154

Para Spark 2.1.0, mi sugerencia sería usar head(n: Int)o take(n: Int)con isEmpty, el que tenga la intención más clara para ti.

df.head(1).isEmpty
df.take(1).isEmpty

con el equivalente de Python:

len(df.head(1)) == 0  # or bool(df.head(1))
len(df.take(1)) == 0  # or bool(df.take(1))

El uso de df.first()y df.head()devolverá el java.util.NoSuchElementExceptionsi el DataFrame está vacío. first()llama head()directamente, que llama head(1).head.

def first(): T = head()
def head(): T = head(1).head

head(1)devuelve un Array, por lo que asumir headese Array hace java.util.NoSuchElementExceptionque el DataFrame esté vacío.

def head(n: Int): Array[T] = withAction("head", limit(n).queryExecution)(collectFromPlan)

Entonces, en lugar de llamar head(), use head(1)directamente para obtener la matriz y luego puede usar isEmpty.

take(n)también es equivalente a head(n)...

def take(n: Int): Array[T] = head(n)

Y limit(1).collect()es equivalente a head(1)(aviso limit(n).queryExecutionen el head(n: Int)método), por lo que los siguientes son todos equivalentes, al menos por lo que puedo decir, y no tendrá que detectar una java.util.NoSuchElementExceptionexcepción cuando el DataFrame esté vacío.

df.head(1).isEmpty
df.take(1).isEmpty
df.limit(1).collect().isEmpty

Sé que esta es una pregunta anterior, así que espero que ayude a alguien que use una versión más nueva de Spark.

hulin003
fuente
19
Para aquellos que usan pyspark. isEmpty no es una cosa. Do len (d.head (1))> 0 en su lugar.
AntiPawn79
4
¿Por qué es esto mejor entonces df.rdd.isEmpty?
Dan Ciborowski - MSFT
1
df.head (1) .isEmpty está tomando mucho tiempo, ¿hay alguna otra solución optimizada para esto?
Rakesh Sabbani
1
Hola @Rakesh Sabbani, si df.head(1)está tomando una gran cantidad de tiempo, probablemente se deba a que su dfplan de ejecución está haciendo algo complicado que evita que Spark tome atajos. Por ejemplo, si solo está leyendo archivos de parquet df = spark.read.parquet(...), estoy bastante seguro de que Spark solo leerá una partición de archivo. Pero si dfestá haciendo otras cosas como agregaciones, es posible que, sin darse cuenta, esté obligando a Spark a leer y procesar una gran parte, si no todos, de sus datos de origen.
hulin003
solo reportando mi experiencia a EVITAR: estaba usando df.limit(1).count()ingenuamente. En grandes conjuntos de datos, lleva mucho más tiempo que los ejemplos informados por @ hulin003, que son casi instantáneos
Vzzarr
45

Yo diría que solo tome el subyacente RDD. En Scala:

df.rdd.isEmpty

en Python:

df.rdd.isEmpty()

Dicho esto, todo lo que esto hace es llamar take(1).length, por lo que hará lo mismo que respondió Rohan ... ¿quizás un poco más explícito?

Justin Pihony
fuente
6
Esto es sorprendentemente más lento que df.count () == 0 en mi caso
arquitectónico
2
¿No es la conversión a rdd una tarea pesada?
Alok
1
Realmente no. Los RDD siguen siendo la base de todo Spark en su mayor parte.
Justin Pihony
28
No convierta el df a RDD. Ralentiza el proceso. Si lo convierte, convertirá DF completo a RDD y verificará si está vacío. Piense que si DF tiene millones de filas, lleva mucho tiempo convertirlo a RDD.
Nandakishore
3
.rdd ralentiza tanto el proceso como mucho
Raul H
14

Puede aprovechar las funciones head()(o first()) para ver si DataFrametiene una sola fila. Si es así, no está vacío.

Rohan Aletty
fuente
10
si el marco de datos está vacío, arroja "java.util.NoSuchElementException: siguiente en el iterador vacío"; [Spark 1.3.1]
FelixHo
6

Si lo hace df.count > 0. Toma los recuentos de todas las particiones en todos los ejecutores y los suma en Driver. Esto lleva un tiempo cuando se trata de millones de filas.

La mejor manera de hacer esto es realizar df.take(1)y verificar si es nulo. Esto volverá java.util.NoSuchElementExceptionasí que es mejor intentarlo df.take(1).

El marco de datos devuelve un error cuando take(1)se realiza en lugar de una fila vacía. He resaltado las líneas de código específicas donde arroja el error.

ingrese la descripción de la imagen aquí

Nandakishore
fuente
1
si ejecuta esto en un marco de datos masivo con millones de registros, ese countmétodo llevará algún tiempo.
TheM00s3
2
Dije lo mismo, no estoy seguro de por qué dio un pulgar hacia abajo.
Nandakishore
tu derecho dijiste lo mismo, desafortunadamente, no te rechacé.
TheM00s3
Ohhh está bien. Lo siento TheMoos3, pero quienquiera que lo haya hecho, observe la respuesta y comprenda el concepto.
Nandakishore
el uso de df.take (1) cuando el df está vacío da como resultado la obtención de una FILA vacía que no se puede comparar con nula
LetsPlayYahtzee
6

Desde Spark 2.4.0 existe Dataset.isEmpty.

Su implementación es:

def isEmpty: Boolean = 
  withAction("isEmpty", limit(1).groupBy().count().queryExecution) { plan =>
    plan.executeCollect().head.getLong(0) == 0
}

Tenga en cuenta que a DataFrameya no es una clase en Scala, es solo un alias de tipo (probablemente cambiado con Spark 2.0):

type DataFrame = Dataset[Row]
Berilio
fuente
1
isEmpty es más lento que df.head (1) .isEmpty
Sandeep540
@ Sandeep540 ¿De verdad? ¿Punto de referencia? Su propuesta crea una instancia de al menos una fila. La implementación de Spark solo transporta un número. head () también está usando limit (), groupBy () realmente no está haciendo nada, es necesario obtener un RelationalGroupedDataset que a su vez proporciona count (). Entonces eso no debería ser significativamente más lento. Probablemente sea más rápido en el caso de un conjunto de datos que contiene muchas columnas (posiblemente datos anidados desnormalizados). De todos modos, debe escribir menos :-)
Beryllium
5

Para los usuarios de Java, puede usar esto en un conjunto de datos:

public boolean isDatasetEmpty(Dataset<Row> ds) {
        boolean isEmpty;
        try {
            isEmpty = ((Row[]) ds.head(1)).length == 0;
        } catch (Exception e) {
            return true;
        }
        return isEmpty;
}

Esto verifica todos los escenarios posibles (vacío, nulo).

Abdennacer Lachiheb
fuente
3

En Scala, puede usar implícitos para agregar los métodos isEmpty()y nonEmpty()la API de DataFrame, lo que hará que el código sea un poco más agradable de leer.

object DataFrameExtensions {
  implicit def extendedDataFrame(dataFrame: DataFrame): ExtendedDataFrame = 
    new ExtendedDataFrame(dataFrame: DataFrame)

  class ExtendedDataFrame(dataFrame: DataFrame) {
    def isEmpty(): Boolean = dataFrame.head(1).isEmpty // Any implementation can be used
    def nonEmpty(): Boolean = !isEmpty
  }
}

Aquí, también se pueden agregar otros métodos. Para usar la conversión implícita, use import DataFrameExtensions._en el archivo que desea usar la funcionalidad extendida. Posteriormente, los métodos se pueden utilizar directamente de la siguiente manera:

val df: DataFrame = ...
if (df.isEmpty) {
  // Do something
}
Shaido - Reincorporar a Monica
fuente
2

Tenía la misma pregunta y probé 3 soluciones principales:

  1. df! = null df.count> 0
  2. df.head (1) .isEmpty () como sugiere @ hulin003
  3. df.rdd.isEmpty como sugiere @Justin Pihony

y, por supuesto, los 3 funcionan, sin embargo, en términos de rendimiento, esto es lo que encontré, al ejecutar estos métodos en el mismo DF en mi máquina, en términos de tiempo de ejecución:

  1. se necesitan ~ 9366ms
  2. se necesitan ~ 5607ms
  3. toma ~ 1921ms

por lo tanto, creo que la mejor solución es df.rdd.isEmpty como sugiere @Justin Pihony

un nombre
fuente
1
la opción 3 lleva menos tiempo, ¿por qué la segunda?
thinkman
Vaya, tienes razón, estoy usando el tercero, actualizo la respuesta
aName
por curiosidad ... ¿con qué tamaño de DataFrames se probó?
aiguofer
1

Encontré que en algunos casos:

>>>print(type(df))
<class 'pyspark.sql.dataframe.DataFrame'>

>>>df.take(1).isEmpty
'list' object has no attribute 'isEmpty'

esto es lo mismo para "longitud" o reemplace take () por head ()

[Solución] para el problema que podemos utilizar.

>>>df.limit(2).count() > 1
False
Shekhar Koirala
fuente
1

Si está utilizando Pypsark, también puede hacer:

len(df.head(1)) > 0
Adelholzener
fuente
1

En PySpark, también puede utilizar este bool(df.head(1))para obtener una Truede Falsevalor

Devuelve Falsesi el marco de datos no contiene filas

Bose
fuente
0
df1.take(1).length>0

El takemétodo devuelve la matriz de filas, por lo que si el tamaño de la matriz es igual a cero, no hay registros en df.

Gopi A
fuente
-1

dataframe.limit(1).count > 0

Esto también desencadena un trabajo, pero como estamos seleccionando un solo registro, incluso en el caso de miles de millones de registros de escala, el consumo de tiempo podría ser mucho menor.

De: https://medium.com/checking-emptiness-in-distributed-objects/count-vs-isempty-surprised-to-see-the-impact-fa70c0246ee0

Jordan Morris
fuente
Todas estas son malas opciones que toman casi el mismo tiempo
Pushpendra Jaiswal
@PushpendraJaiswal sí, y en un mundo de malas opciones, deberíamos elegir la mejor mala opción
Jordan Morris
-2

Puedes hacerlo como:

val df = sqlContext.emptyDataFrame
if( df.eq(sqlContext.emptyDataFrame) )
    println("empty df ")
else 
    println("normal df")
sYer Wang
fuente
1
¿No requerirá que schemados marcos de datos ( sqlContext.emptyDataFrame& df) sean iguales para volver alguna vez true?
y2k-shubham
1
Esto no funcionará. eqse hereda AnyRefy prueba si el argumento (que) es una referencia al objeto receptor (esto).
Alper t. Turker