¿Cómo puedo convertir un RDD ( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
) a un marco de datos org.apache.spark.sql.DataFrame
? Convertí un marco de datos a rdd usando .rdd
. Después de procesarlo, lo quiero de vuelta en el marco de datos. Cómo puedo hacer esto ?
scala
apache-spark
apache-spark-sql
rdd
usuario568109
fuente
fuente
Respuestas:
SqlContext
tiene una serie decreateDataFrame
métodos que crean unDataFrame
dadoRDD
. Me imagino que uno de estos funcionará para su contexto.Por ejemplo:
fuente
Este código funciona perfectamente desde Spark 2.x con Scala 2.11
Importar clases necesarias
Crear
SparkSession
objeto, y aquí estáspark
Vamos
RDD
a hacerloDataFrame
Método 1
Utilizando
SparkSession.createDataFrame(RDD obj)
.Método 2
Usar
SparkSession.createDataFrame(RDD obj)
y especificar nombres de columna.Método 3 (respuesta real a la pregunta)
De esta manera, la entrada
rdd
debe ser de tipoRDD[Row]
.crear el esquema
Ahora aplique ambos
rowsRdd
yschema
acreateDataFrame()
fuente
Suponiendo que su RDD [fila] se llama rdd, puede usar:
fuente
Nota: esta respuesta se publicó originalmente aquí
Estoy publicando esta respuesta porque me gustaría compartir detalles adicionales sobre las opciones disponibles que no encontré en las otras respuestas
Para crear un DataFrame a partir de un RDD de filas, hay dos opciones principales:
1) Como ya se señaló, puede utilizar el
toDF()
que puede importarimport sqlContext.implicits._
. Sin embargo, este enfoque solo funciona para los siguientes tipos de RDD:RDD[Int]
RDD[Long]
RDD[String]
RDD[T <: scala.Product]
(fuente: Scaladoc del
SQLContext.implicits
objeto)La última firma en realidad significa que puede funcionar para un RDD de tuplas o un RDD de clases de casos (porque las tuplas y las clases de casos son subclases de
scala.Product
).Entonces, para usar este enfoque para un
RDD[Row]
, debes mapearlo a unRDD[T <: scala.Product]
. Esto se puede hacer asignando cada fila a una clase de caso personalizada o a una tupla, como en los siguientes fragmentos de código:o
El principal inconveniente de este enfoque (en mi opinión) es que debe establecer explícitamente el esquema del DataFrame resultante en la función de mapa, columna por columna. Tal vez esto se pueda hacer programáticamente si no conoce el esquema de antemano, pero las cosas pueden ponerse un poco confusas allí. Entonces, alternativamente, hay otra opción:
2) Puede usar
createDataFrame(rowRDD: RDD[Row], schema: StructType)
como en la respuesta aceptada, que está disponible en el objeto SQLContext . Ejemplo para convertir un RDD de un antiguo DataFrame:Tenga en cuenta que no es necesario establecer explícitamente ninguna columna de esquema. Reutilizamos el antiguo esquema del DF, que es de
StructType
clase y puede ampliarse fácilmente. Sin embargo, este enfoque a veces no es posible, y en algunos casos puede ser menos eficiente que el primero.fuente
import sqlContext.implicits.
Suponga que tiene un
DataFrame
y desea realizar alguna modificación en los datos de los campos convirtiéndolos enRDD[Row]
.Para volver a convertir
DataFrame
desdeRDD
, necesitamos definir el tipo de estructura deRDD
.Si el tipo de datos era
Long
entonces, se volverá comoLongType
en la estructura.Si
String
entoncesStringType
en estructura.Ahora puede convertir el RDD a DataFrame utilizando el método createDataFrame .
fuente
Aquí hay un ejemplo simple de convertir su Lista en Spark RDD y luego convertir ese Spark RDD en Dataframe.
Tenga en cuenta que he usado la REPL scala de Spark-shell para ejecutar el siguiente código, Aquí sc es una instancia de SparkContext que está implícitamente disponible en Spark-shell. Espero que responda tu pregunta.
fuente
Método 1: (Scala)
Método 2: (Scala)
Método 1: (Python)
Método 2: (Python)
Extrajo el valor del objeto de fila y luego aplicó la clase de caso para convertir rdd a DF
fuente
En versiones más nuevas de spark (2.0+)
fuente
Suponiendo que val spark es un producto de un SparkSession.builder ...
Los mismos pasos, pero con menos declaraciones val:
fuente
Traté de explicar la solución usando el problema de conteo de palabras . 1. Lea el archivo usando sc
Métodos para crear DF
Leer archivo usando chispa
Rdd a Dataframe
val df = sc.textFile ("D: // cca175 / data /") .toDF ("t1") df.show
Método 1
Crear el recuento de palabras RDD a Dataframe
Método 2
Crear marco de datos desde Rdd
Método 3
Definir esquema
import org.apache.spark.sql.types._
esquema de val = nuevo StructType (). add (StructField ("palabra", StringType, true)). add (StructField ("cuenta", StringType, verdadero))
Crear RowRDD
Crear DataFrame desde RDD con esquema
val df = spark.createDataFrame (rowRdd, esquema)
df.show
fuente
Para convertir un Array [Row] a DataFrame o Dataset, lo siguiente funciona con elegancia:
Digamos que el esquema es el StructType para la fila, luego
fuente