¿Cómo concatenamos dos columnas en un DataFrame de Apache Spark? ¿Hay alguna función en Spark SQL que podamos usar?
116
¿Cómo concatenamos dos columnas en un DataFrame de Apache Spark? ¿Hay alguna función en Spark SQL que podamos usar?
Con SQL sin formato puede utilizar CONCAT
:
En Python
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
df.registerTempTable("df")
sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df")
En Scala
import sqlContext.implicits._
val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
df.registerTempTable("df")
sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df")
Desde Spark 1.5.0 puede usar la concat
función con la API de DataFrame:
En Python:
from pyspark.sql.functions import concat, col, lit
df.select(concat(col("k"), lit(" "), col("v")))
En Scala:
import org.apache.spark.sql.functions.{concat, lit}
df.select(concat($"k", lit(" "), $"v"))
También hay una concat_ws
función que toma un separador de cadenas como primer argumento.
Así es como puede personalizar los nombres
import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()
da,
+--------+--------+
|colname1|colname2|
+--------+--------+
| row11| row12|
| row21| row22|
+--------+--------+
crear una nueva columna concatenando:
df = df.withColumn('joined_column',
sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()
+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
| row11| row12| row11_row12|
| row21| row22| row21_row22|
+--------+--------+-------------+
lit
crea una columna de_
Una opción para concatenar columnas de cadenas en Spark Scala está usando
concat
.Es necesario verificar los valores nulos . Porque si una de las columnas es nula, el resultado será nulo incluso si una de las otras columnas tiene información.
Usando
concat
ywithColumn
:Usando
concat
yselect
:Con ambos enfoques, tendrá un NEW_COLUMN cuyo valor es una concatenación de las columnas: COL1 y COL2 de su df original.
fuente
concat_ws
lugar deconcat
, puede evitar verificar NULL.Si desea hacerlo usando DF, puede usar un udf para agregar una nueva columna basada en columnas existentes.
fuente
Desde Spark 2.3 ( SPARK-22771 ) Spark SQL admite el operador de concatenación
||
.Por ejemplo;
fuente
Aquí hay otra forma de hacer esto para pyspark:
fuente
Aquí hay una sugerencia para cuando no conoce el número o el nombre de las columnas en el marco de datos.
fuente
concat (* cols)
v1.5 y superior
Concatena varias columnas de entrada en una sola columna. La función trabaja con cadenas, columnas de matriz binarias y compatibles.
P.ej:
new_df = df.select(concat(df.a, df.b, df.c))
concat_ws (sep, * cols)
v1.5 y superior
Similar a,
concat
pero usa el separador especificado.P.ej:
new_df = df.select(concat_ws('-', df.col1, df.col2))
map_concat (* cols)
v2.4 y superior
Usado para concatizar mapas, devuelve la unión de todos los mapas dados.
P.ej:
new_df = df.select(map_concat("map1", "map2"))
Usando el operador de cadena concat (
||
):v2.3 y superior
P.ej:
df = spark.sql("select col_a || col_b || col_c as abc from table_x")
Referencia: Spark sql doc
fuente
En Spark 2.3.0, puede hacer:
fuente
En Java, puede hacer esto para concatenar varias columnas. El código de muestra es para proporcionarle un escenario y cómo usarlo para una mejor comprensión.
El código anterior concatenó col1, col2, col3 separados por "_" para crear una columna con el nombre "concatenatedCol".
fuente
¿Tenemos la sintaxis de Java correspondiente al proceso a continuación?
fuente
Otra forma de hacerlo en pySpark usando sqlContext ...
fuente
De hecho, hay algunas hermosas abstracciones incorporadas para que pueda realizar su concatenación sin la necesidad de implementar una función personalizada. Como mencionaste Spark SQL, supongo que estás intentando pasarlo como un comando declarativo a través de spark.sql (). Si es así, puede lograr de una manera sencilla pasando un comando SQL como:
SELECT CONCAT(col1, '<delimiter>', col2, ...) AS concat_column_name FROM <table_name>;
Además, desde Spark 2.3.0, puede usar comandos en líneas con:
SELECT col1 || col2 AS concat_column_name FROM <table_name>;
Donde, es su delimitador preferido (también puede ser un espacio vacío) y es la tabla temporal o permanente desde la que está tratando de leer.
fuente
También podemos usar SelectExpr de forma simple. df1.selectExpr ("*", "superior (_2 || _3) como nuevo")
fuente