¿Cómo agregar una columna constante en un Spark DataFrame?

137

Quiero agregar una columna en un DataFramevalor arbitrario (que es lo mismo para cada fila). Me sale un error cuando uso de la withColumnsiguiente manera:

dt.withColumn('new_column', 10).head(5)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
      1 dt = (messages
      2     .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)

/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
   1166         [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
   1167         """
-> 1168         return self.select('*', col.alias(colName))
   1169 
   1170     @ignore_unicode_prefix

AttributeError: 'int' object has no attribute 'alias'

Parece que puedo engañar a la función para que funcione como quiero al sumar y restar una de las otras columnas (para que sumen a cero) y luego sumar el número que quiero (10 en este caso):

dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)
[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=93506471, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=80488242, messagetype=1, dt=4809600.0, new_column=10)]

Esto es supremamente hacky, ¿verdad? ¿Asumo que hay una forma más legítima de hacer esto?

Evan Zamir
fuente

Respuestas:

221

Spark 2.2+

Spark 2.2 presenta typedLitsoporte Seq, Mapy Tuples( SPARK-19254 ) y las siguientes llamadas deben ser compatibles (Scala):

import org.apache.spark.sql.functions.typedLit

df.withColumn("some_array", typedLit(Seq(1, 2, 3)))
df.withColumn("some_struct", typedLit(("foo", 1, 0.3)))
df.withColumn("some_map", typedLit(Map("key1" -> 1, "key2" -> 2)))

Spark 1.3+ ( lit), 1.4+ ( array, struct), 2.0+ ( map):

El segundo argumento para DataFrame.withColumndebería ser un, Columnasí que debes usar un literal:

from pyspark.sql.functions import lit

df.withColumn('new_column', lit(10))

Si necesita columnas complejas, puede construirlas usando bloques como array:

from pyspark.sql.functions import array, create_map, struct

df.withColumn("some_array", array(lit(1), lit(2), lit(3)))
df.withColumn("some_struct", struct(lit("foo"), lit(1), lit(.3)))
df.withColumn("some_map", create_map(lit("key1"), lit(1), lit("key2"), lit(2)))

Se pueden usar exactamente los mismos métodos en Scala.

import org.apache.spark.sql.functions.{array, lit, map, struct}

df.withColumn("new_column", lit(10))
df.withColumn("map", map(lit("key1"), lit(1), lit("key2"), lit(2)))

Para proporcionar nombres para structsusar aliasen cada campo:

df.withColumn(
    "some_struct",
    struct(lit("foo").alias("x"), lit(1).alias("y"), lit(0.3).alias("z"))
 )

o casten todo el objeto

df.withColumn(
    "some_struct", 
    struct(lit("foo"), lit(1), lit(0.3)).cast("struct<x: string, y: integer, z: double>")
 )

También es posible, aunque más lento, usar un UDF.

Nota :

Se pueden usar las mismas construcciones para pasar argumentos constantes a UDF o funciones SQL.

cero323
fuente
1
Para otros que usan esto para implementar ... el método withColumn devuelve un nuevo DataFrame agregando una columna o reemplazando la columna existente que tiene el mismo nombre, por lo que deberá reasignar los resultados a df o asignarlos a una nueva variable. Por ejemplo, `df = df.withColumn ('new_column', lit (10)) '
Even Mien
con cada iteración, ¿podemos cambiar los valores dentro de la columna? Ya he intentado esto for i in range(len(item)) : df.withColumn('new_column', lit({}).format(i)) pero esto no funciona
Tracy
30

En spark 2.2 hay dos formas de agregar valor constante en una columna en DataFrame:

1) Usando lit

2) Usando typedLit .

La diferencia entre los dos es que typedLit también puede manejar tipos de escala parametrizados, por ejemplo, Lista, Seq y Mapa

Marco de datos de muestra:

val df = spark.createDataFrame(Seq((0,"a"),(1,"b"),(2,"c"))).toDF("id", "col1")

+---+----+
| id|col1|
+---+----+
|  0|   a|
|  1|   b|
+---+----+

1) Uso lit: Agregar valor de cadena constante en una nueva columna llamada newcol:

import org.apache.spark.sql.functions.lit
val newdf = df.withColumn("newcol",lit("myval"))

Resultado:

+---+----+------+
| id|col1|newcol|
+---+----+------+
|  0|   a| myval|
|  1|   b| myval|
+---+----+------+

2) Usando typedLit:

import org.apache.spark.sql.functions.typedLit
df.withColumn("newcol", typedLit(("sample", 10, .044)))

Resultado:

+---+----+-----------------+
| id|col1|           newcol|
+---+----+-----------------+
|  0|   a|[sample,10,0.044]|
|  1|   b|[sample,10,0.044]|
|  2|   c|[sample,10,0.044]|
+---+----+-----------------+
Ayush Vatsyayan
fuente
¿Podría compartir la versión completa junto con la declaración de importación
Ayush Vatsyayan
Versión de chispa 2.2.1. la declaración de importación es de pyspark.sql.functions import typedLit. También probé el que compartiste arriba.
braj