Cambiar el nombre de las columnas de un DataFrame en Spark Scala

93

Estoy tratando de convertir todos los nombres de encabezados / columnas de un DataFrameen Spark-Scala. a partir de ahora, se me ocurre el siguiente código que solo reemplaza el nombre de una sola columna.

for( i <- 0 to origCols.length - 1) {
  df.withColumnRenamed(
    df.columns(i), 
    df.columns(i).toLowerCase
  );
}
Sam
fuente

Respuestas:

239

Si la estructura es plana:

val df = Seq((1L, "a", "foo", 3.0)).toDF
df.printSchema
// root
//  |-- _1: long (nullable = false)
//  |-- _2: string (nullable = true)
//  |-- _3: string (nullable = true)
//  |-- _4: double (nullable = false)

lo más simple que puedes hacer es usar el toDFmétodo:

val newNames = Seq("id", "x1", "x2", "x3")
val dfRenamed = df.toDF(newNames: _*)

dfRenamed.printSchema
// root
// |-- id: long (nullable = false)
// |-- x1: string (nullable = true)
// |-- x2: string (nullable = true)
// |-- x3: double (nullable = false)

Si desea cambiar el nombre de columnas individuales, puede usar selectcon alias:

df.select($"_1".alias("x1"))

que se puede generalizar fácilmente a varias columnas:

val lookup = Map("_1" -> "foo", "_3" -> "bar")

df.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*)

o bien withColumnRenamed:

df.withColumnRenamed("_1", "x1")

que usan con foldLeftpara cambiar el nombre de varias columnas:

lookup.foldLeft(df)((acc, ca) => acc.withColumnRenamed(ca._1, ca._2))

Con estructuras anidadas ( structs), una opción posible es cambiar el nombre seleccionando una estructura completa:

val nested = spark.read.json(sc.parallelize(Seq(
    """{"foobar": {"foo": {"bar": {"first": 1.0, "second": 2.0}}}, "id": 1}"""
)))

nested.printSchema
// root
//  |-- foobar: struct (nullable = true)
//  |    |-- foo: struct (nullable = true)
//  |    |    |-- bar: struct (nullable = true)
//  |    |    |    |-- first: double (nullable = true)
//  |    |    |    |-- second: double (nullable = true)
//  |-- id: long (nullable = true)

@transient val foobarRenamed = struct(
  struct(
    struct(
      $"foobar.foo.bar.first".as("x"), $"foobar.foo.bar.first".as("y")
    ).alias("point")
  ).alias("location")
).alias("record")

nested.select(foobarRenamed, $"id").printSchema
// root
//  |-- record: struct (nullable = false)
//  |    |-- location: struct (nullable = false)
//  |    |    |-- point: struct (nullable = false)
//  |    |    |    |-- x: double (nullable = true)
//  |    |    |    |-- y: double (nullable = true)
//  |-- id: long (nullable = true)

Tenga en cuenta que puede afectar a los nullabilitymetadatos. Otra posibilidad es cambiar el nombre lanzando:

nested.select($"foobar".cast(
  "struct<location:struct<point:struct<x:double,y:double>>>"
).alias("record")).printSchema

// root
//  |-- record: struct (nullable = true)
//  |    |-- location: struct (nullable = true)
//  |    |    |-- point: struct (nullable = true)
//  |    |    |    |-- x: double (nullable = true)
//  |    |    |    |-- y: double (nullable = true)

o:

import org.apache.spark.sql.types._

nested.select($"foobar".cast(
  StructType(Seq(
    StructField("location", StructType(Seq(
      StructField("point", StructType(Seq(
        StructField("x", DoubleType), StructField("y", DoubleType)))))))))
).alias("record")).printSchema

// root
//  |-- record: struct (nullable = true)
//  |    |-- location: struct (nullable = true)
//  |    |    |-- point: struct (nullable = true)
//  |    |    |    |-- x: double (nullable = true)
//  |    |    |    |-- y: double (nullable = true)
zero323
fuente
Hola @ zero323 Cuando uso withColumnRenamed, obtengo AnalysisException no puede resolver 'CC8. 1 'columnas de entrada dadas ... Falla a pesar de que CC8.1 está disponible en DataFrame, por favor guíe.
unk1102
@ u449355 No me queda claro si esta es una columna anidada o una que contiene puntos. En el último caso, las comillas invertidas deberían funcionar (al menos en algunos casos básicos).
zero323
1
que : _*)significa endf.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*)
Anton Kim
1
Para responder a la pregunta de Anton Kim: el : _*es el llamado operador "splat" de scala. Básicamente, descompone una cosa similar a una matriz en una lista no contenida, lo cual es útil cuando desea pasar la matriz a una función que toma un número arbitrario de argumentos, pero no tiene una versión que toma un List[]. Si está familiarizado con Perl, es la diferencia entre some_function(@my_array) # "splatted"y some_function(\@my_array) # not splatted ... in perl the backslash "\" operator returns a reference to a thing.
Mylo Stone
1
Esta afirmación es realmente oscura para mí df.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*). ¿Podría descomponerla por favor? especialmente la lookup.getOrElse(c,c)parte.
Aetos
19

Para aquellos de ustedes interesados ​​en la versión de PySpark (en realidad es lo mismo en Scala - vea el comentario a continuación):

    merchants_df_renamed = merchants_df.toDF(
        'merchant_id', 'category', 'subcategory', 'merchant')

    merchants_df_renamed.printSchema()

Resultado:

root
| - comerciante_id: entero (anulable = verdadero)
| - categoría: cadena (anulable = verdadero)
| - subcategoría: cadena (anulable = verdadero)
| - comerciante: cadena (anulable = verdadero)

Tagar
fuente
1
Con el uso toDF()para cambiar el nombre de las columnas en DataFrame debe tener cuidado. Este método funciona mucho más lento que otros. Tengo un DataFrame que contiene 100 millones de registros y una consulta de recuento simple toma ~ 3 s, mientras que la misma consulta con el toDF()método toma ~ 16 s. Pero cuando uso el select col AS col_newmétodo para cambiar el nombre, obtengo ~ 3s nuevamente. ¡Más de 5 veces más rápido! Spark 2.3.2.3
Ihor Konovalenko
6
def aliasAllColumns(t: DataFrame, p: String = "", s: String = ""): DataFrame =
{
  t.select( t.columns.map { c => t.col(c).as( p + c + s) } : _* )
}

En caso de que no sea obvio, esto agrega un prefijo y un sufijo a cada uno de los nombres de columna actuales. Esto puede ser útil cuando tiene dos tablas con una o más columnas que tienen el mismo nombre, y desea unirlas, pero aún puede eliminar la ambigüedad de las columnas en la tabla resultante. Seguro que sería bueno si hubiera una forma similar de hacer esto en SQL "normal".

Mylo Stone
fuente
me gusta seguro, agradable y elegante
thebluephantom
1

Suponga que el marco de datos df tiene 3 columnas id1, name1, price1 y desea cambiarles el nombre a id2, name2, price2

val list = List("id2", "name2", "price2")
import spark.implicits._
val df2 = df.toDF(list:_*)
df2.columns.foreach(println)

Encontré este enfoque útil en muchos casos.

Jagadeesh Verri
fuente
0

unirse a la mesa de remolque no cambiar el nombre de la clave unida

// method 1: create a new DF
day1 = day1.toDF(day1.columns.map(x => if (x.equals(key)) x else s"${x}_d1"): _*)

// method 2: use withColumnRenamed
for ((x, y) <- day1.columns.filter(!_.equals(key)).map(x => (x, s"${x}_d1"))) {
    day1 = day1.withColumnRenamed(x, y)
}

¡trabajos!

Colin Wang
fuente
0
Sometime we have the column name is below format in SQLServer or MySQL table

Ex  : Account Number,customer number

But Hive tables do not support column name containing spaces, so please use below solution to rename your old column names.

Solution:

val renamedColumns = df.columns.map(c => df(c).as(c.replaceAll(" ", "_").toLowerCase()))
df = df.select(renamedColumns: _*)
R. RamkumarYugo
fuente