Concatenar columnas en Apache Spark DataFrame

Respuestas:

175

Con SQL sin formato puede utilizar CONCAT:

  • En Python

    df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
  • En Scala

    import sqlContext.implicits._
    
    val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")

Desde Spark 1.5.0 puede usar la concatfunción con la API de DataFrame:

  • En Python:

    from pyspark.sql.functions import concat, col, lit
    
    df.select(concat(col("k"), lit(" "), col("v")))
  • En Scala:

    import org.apache.spark.sql.functions.{concat, lit}
    
    df.select(concat($"k", lit(" "), $"v"))

También hay una concat_wsfunción que toma un separador de cadenas como primer argumento.

zero323
fuente
46

Así es como puede personalizar los nombres

import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()

da,

+--------+--------+
|colname1|colname2|
+--------+--------+
|   row11|   row12|
|   row21|   row22|
+--------+--------+

crear una nueva columna concatenando:

df = df.withColumn('joined_column', 
                    sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()

+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
|   row11|   row12|  row11_row12|
|   row21|   row22|  row21_row22|
+--------+--------+-------------+
muon
fuente
4
litcrea una columna de_
muon
34

Una opción para concatenar columnas de cadenas en Spark Scala está usando concat.

Es necesario verificar los valores nulos . Porque si una de las columnas es nula, el resultado será nulo incluso si una de las otras columnas tiene información.

Usando concaty withColumn:

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))

Usando concaty select:

val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")

Con ambos enfoques, tendrá un NEW_COLUMN cuyo valor es una concatenación de las columnas: COL1 y COL2 de su df original.

Ignacio Alorre
fuente
1
Probé su método en pyspark pero no funcionó, advirtiendo "col debería ser Column".
Samson
@Samson lo siento, solo verifiqué la API de Scala
Ignacio Alorre
3
@IgnacioAlorre Si está usando en concat_wslugar de concat, puede evitar verificar NULL.
Aswath K
18

Si desea hacerlo usando DF, puede usar un udf para agregar una nueva columna basada en columnas existentes.

val sqlContext = new SQLContext(sc)
case class MyDf(col1: String, col2: String)

//here is our dataframe
val df = sqlContext.createDataFrame(sc.parallelize(
    Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F"))
))

//Define a udf to concatenate two passed in string values
val getConcatenated = udf( (first: String, second: String) => { first + " " + second } )

//use withColumn method to add a new column called newColName
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()
Danés shrestha
fuente
12

Desde Spark 2.3 ( SPARK-22771 ) Spark SQL admite el operador de concatenación ||.

Por ejemplo;

val df = spark.sql("select _c1 || _c2 as concat_column from <table_name>")
Krishas
fuente
10

Aquí hay otra forma de hacer esto para pyspark:

#import concat and lit functions from pyspark.sql.functions 
from pyspark.sql.functions import concat, lit

#Create your data frame
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa'])

#Use select, concat, and lit functions to do the concatenation
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African'))

#Show the new data frame
personDF.show()

----------RESULT-------------------------

84
+------------+
|East African|
+------------+
|   Ethiopian|
|      Kenyan|
|     Ugandan|
|     Rwandan|
+------------+
Teddy Belay
fuente
7

Aquí hay una sugerencia para cuando no conoce el número o el nombre de las columnas en el marco de datos.

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))
wones0120
fuente
4

concat (* cols)

v1.5 y superior

Concatena varias columnas de entrada en una sola columna. La función trabaja con cadenas, columnas de matriz binarias y compatibles.

P.ej: new_df = df.select(concat(df.a, df.b, df.c))


concat_ws (sep, * cols)

v1.5 y superior

Similar a, concatpero usa el separador especificado.

P.ej: new_df = df.select(concat_ws('-', df.col1, df.col2))


map_concat (* cols)

v2.4 y superior

Usado para concatizar mapas, devuelve la unión de todos los mapas dados.

P.ej: new_df = df.select(map_concat("map1", "map2"))


Usando el operador de cadena concat ( ||):

v2.3 y superior

P.ej: df = spark.sql("select col_a || col_b || col_c as abc from table_x")

Referencia: Spark sql doc

Ani Menon
fuente
2

En Spark 2.3.0, puede hacer:

spark.sql( """ select '1' || column_a from table_a """)
Charlie 木匠
fuente
1

En Java, puede hacer esto para concatenar varias columnas. El código de muestra es para proporcionarle un escenario y cómo usarlo para una mejor comprensión.

SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> reducedInventory = spark.sql("select * from table_name")
                        .withColumn("concatenatedCol",
                                concat(col("col1"), lit("_"), col("col2"), lit("_"), col("col3")));


class JavaSparkSessionSingleton {
    private static transient SparkSession instance = null;

    public static SparkSession getInstance(SparkConf sparkConf) {
        if (instance == null) {
            instance = SparkSession.builder().config(sparkConf)
                    .getOrCreate();
        }
        return instance;
    }
}

El código anterior concatenó col1, col2, col3 separados por "_" para crear una columna con el nombre "concatenatedCol".

Wandermonk
fuente
1

¿Tenemos la sintaxis de Java correspondiente al proceso a continuación?

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))
Roopesh MB
fuente
0

Otra forma de hacerlo en pySpark usando sqlContext ...

#Suppose we have a dataframe:
df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2'])

# Now we can concatenate columns and assign the new column a name 
df = df.select(concat(df.colname1, df.colname2).alias('joined_colname'))
Gur
fuente
0

De hecho, hay algunas hermosas abstracciones incorporadas para que pueda realizar su concatenación sin la necesidad de implementar una función personalizada. Como mencionaste Spark SQL, supongo que estás intentando pasarlo como un comando declarativo a través de spark.sql (). Si es así, puede lograr de una manera sencilla pasando un comando SQL como: SELECT CONCAT(col1, '<delimiter>', col2, ...) AS concat_column_name FROM <table_name>;

Además, desde Spark 2.3.0, puede usar comandos en líneas con: SELECT col1 || col2 AS concat_column_name FROM <table_name>;

Donde, es su delimitador preferido (también puede ser un espacio vacío) y es la tabla temporal o permanente desde la que está tratando de leer.


fuente
0

También podemos usar SelectExpr de forma simple. df1.selectExpr ("*", "superior (_2 || _3) como nuevo")

Deepak Saxena
fuente