No puede agregar una columna arbitraria a un DataFrame
en Spark. Las nuevas columnas solo se pueden crear usando literales (otros tipos de literales se describen en ¿Cómo agregar una columna constante en un Spark DataFrame? )
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
transformando una columna existente:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
incluido usando join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
o generado con la función / udf:
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
En términos de rendimiento, las funciones integradas ( pyspark.sql.functions
), que se asignan a la expresión de Catalyst, generalmente se prefieren a las funciones definidas por el usuario de Python.
Si desea agregar contenido de un RDD arbitrario como columna, puede
Para agregar una columna usando un UDF:
fuente
Para Spark 2.0
fuente
df = df.select('*', (df.age + 10).alias('agePlusTen'))
que está efectivamente agregando una columna arbitraria como @ zero323 nos advirtió anteriormente era imposible, a menos que haya algo malo en hacer esto en Spark, en las pandas es la manera estándar ..df.select('*', df.age + 10, df.age + 20)
Hay varias formas en que podemos agregar una nueva columna en pySpark.
Primero creemos un DataFrame simple.
Ahora intentemos duplicar el valor de la columna y almacenarlo en una nueva columna. PFB pocos enfoques diferentes para lograr lo mismo.
Para obtener más ejemplos y explicaciones sobre las funciones de DataFrame, puede visitar mi blog .
Espero que esto ayude.
fuente
Puede definir un nuevo
udf
al agregar uncolumn_name
:fuente
fuente
StringType()
.Me gustaría ofrecer un ejemplo generalizado para un caso de uso muy similar:
Caso de uso: tengo un csv que consta de:
Necesito realizar algunas transformaciones y el csv final debe verse como
Necesito hacer esto porque este es el esquema definido por algún modelo y necesito que mis datos finales sean interoperables con inserciones masivas de SQL y esas cosas.
entonces:
1) Leí el csv original usando spark.read y lo llamo "df".
2) Hago algo a los datos.
3) Agrego las columnas nulas usando este script:
De esta manera, puede estructurar su esquema después de cargar un csv (también funcionaría para reordenar columnas si tiene que hacer esto para muchas tablas).
fuente
La forma más sencilla de agregar una columna es usar "withColumn". Dado que el marco de datos se crea utilizando sqlContext, debe especificar el esquema o, de forma predeterminada, puede estar disponible en el conjunto de datos. Si se especifica el esquema, la carga de trabajo se vuelve tediosa al cambiar cada vez.
A continuación se muestra un ejemplo que puede considerar:
fuente
Podemos agregar columnas adicionales a DataFrame directamente con los siguientes pasos:
fuente