Spark Dataframe distingue columnas con nombre duplicado

82

Entonces, como sé en Spark Dataframe, que para múltiples columnas puede tener el mismo nombre que se muestra en la siguiente instantánea del marco de datos:

[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]

El resultado anterior se crea al unirse con un marco de datos a sí mismo, puede ver que hay 4columnas con dos ay f.

El problema es que cuando trato de hacer más cálculos con la acolumna, no puedo encontrar una manera de seleccionar a, lo intenté df[0]y df.select('a')ambos me devolvieron el siguiente error:

AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.

¿Hay alguna forma en la API de Spark que pueda distinguir las columnas de los nombres duplicados nuevamente? ¿O quizás alguna forma de permitirme cambiar los nombres de las columnas?

resec
fuente

Respuestas:

61

Le recomendaría que cambie los nombres de las columnas para su join.

df1.select(col("a") as "df1_a", col("f") as "df1_f")
   .join(df2.select(col("a") as "df2_a", col("f") as "df2_f"), col("df1_a" === col("df2_a"))

El resultante DataFrametendráschema

(df1_a, df1_f, df2_a, df2_f)
Glennie Helles Sindholt
fuente
5
Es posible que deba corregir su respuesta, ya que las comillas no se ajustan correctamente entre los nombres de las columnas.
Sameh Sharaf
2
@SamehSharaf ¿Asumo que eres tú quien vota en contra de mi respuesta? Pero la respuesta es, de hecho, 100% correcta: simplemente estoy usando scala '-shorthand para la selección de columnas, por lo que de hecho no hay ningún problema con las comillas.
Glennie Helles Sindholt
31
@GlennieHellesSindholt, punto justo. Es confuso porque la respuesta está etiquetada como pythony pyspark.
Jorge Leitao
¿Qué pasa si cada marco de datos contiene más de 100 columnas y solo necesitamos cambiar el nombre de una columna que es el mismo? Seguramente, no puedo escribir manualmente todos esos nombres de columna en la cláusula de selección
bikashg
6
En ese caso, podría ir condf1.withColumnRenamed("a", "df1_a")
Glennie Helles Sindholt
100

Comencemos con algunos datos:

from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row

df1 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=125231, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
])

df2 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
])

Hay algunas formas de abordar este problema. En primer lugar, puede hacer referencia inequívocamente a las columnas de la tabla secundaria utilizando columnas principales:

df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

También puede usar alias de tablas:

from pyspark.sql.functions import col

df1_a = df1.alias("df1_a")
df2_a = df2.alias("df2_a")

df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

Finalmente, puede cambiar el nombre de las columnas mediante programación:

df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
df2_r = df2.select(*(col(x).alias(x + '_df2') for x in df2.columns))

df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)

## +--------------------+
## |               f_df1|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
zero323
fuente
7
Gracias por su edición por mostrar tantas formas de obtener la columna correcta en esos casos ambiguos, creo que sus ejemplos deberían incluirse en la guía de programación de Spark. ¡He aprendido mucho!
resec
pequeña corrección: en df2_r = **df2** .select(*(col(x).alias(x + '_df2') for x in df2.columns))lugar de df2_r = df1.select(*(col(x).alias(x + '_df2') for x in df2.columns)). Por lo demás, buenas cosas
Vzzarr
Estoy de acuerdo con que esto debería ser parte de la guía de programación de Spark. Oro puro. Finalmente pude desenredar la fuente de ambigüedad seleccionando columnas por los nombres antiguos antes de realizar la combinación. La solución de agregar sufijos mediante programación a los nombres de las columnas antes de realizar la combinación elimina toda la ambigüedad.
Pablo Adames
7

Puede usar el def drop(col: Column)método para eliminar la columna duplicada, por ejemplo:

DataFrame:df1

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

DataFrame:df2

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

cuando me uno a df1 con df2, el DataFrame será el siguiente:

val newDf = df1.join(df2,df1("a")===df2("a"))

DataFrame:newDf

+-------+-----+-------+-----+
| a     | f   | a     | f   |
+-------+-----+-------+-----+
|107831 | ... |107831 | ... |
|107831 | ... |107831 | ... |
+-------+-----+-------+-----+

Ahora, podemos usar el def drop(col: Column)método para eliminar la columna duplicada 'a' o 'f', como sigue:

val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))
StrongYoung
fuente
¿Funcionaría este enfoque si está haciendo una combinación externa y las dos columnas tienen valores diferentes?
prafi
Es posible que no desee descartar relaciones diferentes con el mismo esquema.
thebluephantom
5

Después de profundizar en la API de Spark, descubrí que primero puedo usar aliaspara crear un alias para el marco de datos original, luego uso withColumnRenamedpara cambiar manualmente el nombre de cada columna en el alias, esto hará eljoin sin causar la duplicación del nombre de la columna.

Se pueden consultar más detalles a continuación, la API de Spark Dataframe :

pyspark.sql.DataFrame.alias

pyspark.sql.DataFrame.withColumnRenamed

Sin embargo, creo que esto es solo una solución problemática y me pregunto si hay alguna manera mejor para mi pregunta.

resec
fuente
4

Así es como podemos unir dos Dataframes en los mismos nombres de columna en PySpark.

df = df1.join(df2, ['col1','col2','col3'])

Si lo hace printSchema()después de esto, puede ver que se han eliminado las columnas duplicadas.

Nikhil Redij
fuente
3

Suponga que los DataFrames que desea unir son df1 y df2, y los está uniendo en la columna 'a', entonces tiene 2 métodos

Método 1

df1.join (df2, 'a', 'left_outer')

Este es un método asombroso y es muy recomendable.

Método 2

df1.join (df2, df1.a == df2.a, 'left_outer'). drop (df2.a)

Typhoonbxq
fuente
1

Puede que este no sea el mejor enfoque, pero si desea cambiar el nombre de las columnas duplicadas (después de unirse), puede hacerlo usando esta pequeña función.

def rename_duplicate_columns(dataframe):
    columns = dataframe.columns
    duplicate_column_indices = list(set([columns.index(col) for col in columns if columns.count(col) == 2]))
    for index in duplicate_column_indices:
        columns[index] = columns[index]+'2'
    dataframe = dataframe.toDF(*columns)
    return dataframe
Akash
fuente
1

si solo la columna clave es la misma en ambas tablas, intente utilizar la siguiente forma (Método 1):

left. join(right , 'key', 'inner')

en lugar de debajo (enfoque 2):

left. join(right , left.key == right.key, 'inner')

Ventajas de usar el enfoque 1:

  • la 'clave' se mostrará solo una vez en el marco de datos final
  • fácil de usar la sintaxis

Contras de usar el enfoque 1:

  • solo ayuda con la columna clave
  • Escenarios, en los que el caso de combinación izquierda, si planea usar el recuento nulo de clave derecha, esto no funcionará. En ese caso, uno tiene que cambiar el nombre de una de las claves como se mencionó anteriormente.
Manish Singla
fuente
0

Si tiene un caso de uso más complicado que el descrito en la respuesta de Glennie Helles Sindholt, por ejemplo, tiene otros / pocos nombres de columna que no se unen que también son iguales y desea distinguirlos mientras selecciona, es mejor usar alias, por ejemplo:

df3 = df1.select("a", "b").alias("left")\
   .join(df2.select("a", "b").alias("right"), ["a"])\
   .select("left.a", "left.b", "right.b")

df3.columns
['a', 'b', 'b']
Wassermann
fuente