Fusionar múltiples marcos de datos en fila en PySpark

21

Tengo 10 marcos de datos pyspark.sql.dataframe.DataFrame, obtenidos de randomSplitcomo (td1, td2, td3, td4, td5, td6, td7, td8, td9, td10) = td.randomSplit([.1, .1, .1, .1, .1, .1, .1, .1, .1, .1], seed = 100)Ahora quiero unir 9 tden un solo marco de datos, ¿cómo debo hacer eso?

Ya lo he intentado unionAll, pero esta función solo acepta dos argumentos.

td1_2 = td1.unionAll(td2) 
# this is working fine

td1_2_3 = td1.unionAll(td2, td3) 
# error TypeError: unionAll() takes exactly 2 arguments (3 given)

¿Hay alguna forma de combinar más de dos marcos de datos en fila?

El propósito de hacer esto es que estoy haciendo la validación cruzada 10 veces manualmente sin usar el CrossValidatormétodo PySpark , así que tomo 9 en el entrenamiento y 1 en los datos de prueba y luego lo repetiré para otras combinaciones.

Krishna Prasad
fuente
1
Esto no responde directamente a la pregunta, pero aquí sugiero mejorar el método de nomenclatura para que, al final, no tengamos que escribir, por ejemplo: [td1, td2, td3, td4, td5, td6, td7 , td8, td9, td10]. Imagina hacer esto por un CV de 100 veces. Esto es lo que haré: porciones = [0.1] * 10 cv = df7.randomSplit (porciones) pliegues = lista (rango (10)) para i en rango (10): test_data = cv [i] fold_no_i = pliegues [: i] + pliegues [i + 1:] train_data = cv [fold_no_i [0]] para j en fold_no_i [1:]: train_data = train_data.union (cv [j])
ngoc thoag

Respuestas:

37

Robado de: /programming/33743978/spark-union-of-multiple-rdds

Fuera de encadenar sindicatos, esta es la única forma de hacerlo para DataFrames.

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

unionAll(td2, td3, td4, td5, td6, td7, td8, td9, td10)

Lo que sucede es que toma todos los objetos que pasó como parámetros y los reduce usando unionAll (esta reducción es de Python, no la reducción de Spark aunque funcionan de manera similar), lo que finalmente lo reduce a un DataFrame.

Si en lugar de DataFrames son RDD normales, puede pasar una lista de ellos a la función de unión de su SparkContext

EDITAR: para su propósito, propongo un método diferente, ya que tendría que repetir esta unión completa 10 veces para sus diferentes pliegues para la validación cruzada, agregaría etiquetas para qué pliegue pertenece una fila y simplemente filtrar su DataFrame para cada pliegue según la etiqueta

Jan van der Vegt
fuente
(+1) Una buena solución. Sin embargo, debe existir una función que permita la concatenación de múltiples marcos de datos. ¡Sería bastante útil!
Dawny33
No estoy en desacuerdo con eso
Jan van der Vegt
@ JanvanderVegt Gracias, funciona y la idea de agregar etiquetas para filtrar el conjunto de datos de entrenamiento y prueba, ya lo hice. Muchas gracias por su ayuda.
Krishna Prasad
@Jan van der Vegt ¿Puede aplicar la misma lógica para unirse y responder esta pregunta
GeorgeOfTheRF
6

En algún momento, cuando los marcos de datos para combinar no tienen el mismo orden de columnas, es mejor df2.select (df1.columns) para asegurar que ambos df tengan el mismo orden de columnas antes de la unión.

import functools 

def unionAll(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs) 

Ejemplo:

df1 = spark.createDataFrame([[1,1],[2,2]],['a','b'])
# different column order. 
df2 = spark.createDataFrame([[3,333],[4,444]],['b','a']) 
df3 = spark.createDataFrame([555,5],[666,6]],['b','a']) 

unioned_df = unionAll([df1, df2, df3])
unioned_df.show() 

ingrese la descripción de la imagen aquí

de lo contrario, generaría el siguiente resultado en su lugar.

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs) 

unionAll(*[df1, df2, df3]).show()

ingrese la descripción de la imagen aquí

Wong Tat Yau
fuente
2

¿Qué tal usar la recursividad?

def union_all(dfs):
    if len(dfs) > 1:
        return dfs[0].unionAll(union_all(dfs[1:]))
    else:
        return dfs[0]

td = union_all([td1, td2, td3, td4, td5, td6, td7, td8, td9, td10])
proinsias
fuente