¿Cómo guardar DataFrame directamente en Hive?

85

¿Es posible guardar DataFrameen Spark directamente en Hive?

He tratado con la conversión DataFramede Rdda continuación, guardar como archivo de texto y luego cargar en la colmena. Pero me pregunto si puedo guardar directamente dataframeen colmena.

Gourav
fuente

Respuestas:

116

Puede crear una tabla temporal en memoria y almacenarla en una tabla de colmena usando sqlContext.

Digamos que su marco de datos es myDf. Puede crear una tabla temporal usando,

myDf.createOrReplaceTempView("mytempTable") 

Luego, puede usar una declaración de colmena simple para crear una tabla y volcar los datos de su tabla temporal.

sqlContext.sql("create table mytable as select * from mytempTable");
Vinay Kumar
fuente
2
esto que todo el parquet de errores de lectura que estaba recibiendo cuando se utiliza en write.saveAsTable chispa 2,0
ski_squaw
2
Sí, sin embargo, podemos usar la partición en el marco de datos antes de crear la tabla temporal. @chhantyal
Vinay Kumar
1
¿Cómo pudiste mezclar y combinar la temporarymesa con la hivemesa? Al hacerlo show tables, solo incluye las hivetablas para mi spark 2.3.0instalación
StephenBoesch
1
esta tabla temporal se guardará en su contexto de colmena y no pertenece a las tablas de colmena de ninguna manera.
Vinay Kumar
1
hola @VinayKumar por qué dice "Si está usando saveAsTable (es más como persistir su marco de datos), debe asegurarse de tener suficiente memoria asignada a su aplicación Spark". ¿podrías explicar este punto?
enneppi
27

Utilice DataFrameWriter.saveAsTable. ( df.write.saveAsTable(...)) Consulte la Guía de Spark SQL y DataFrame .

Daniel Darabos
fuente
4
saveAsTable no crea tablas compatibles con Hive. La mejor solución que encontré es Vinay Kumar.
RChat
@Jacek: Yo mismo agregué esta nota, porque creo que mi respuesta es incorrecta. Lo eliminaría, salvo que sea aceptado. ¿Crees que la nota está mal?
Daniel Darabos
Si. La nota estaba mal y por eso la quité. "Por favor corríjame si me equivoco" se aplica aquí :)
Jacek Laskowski
1
¿Esto df.write().saveAsTable(tableName) también escribirá datos de transmisión en la tabla?
user1870400
1
no, no puede guardar datos de transmisión con saveAsTable, ni siquiera está en la API
Brian
20

No veo df.write.saveAsTable(...)obsoleto en la documentación de Spark 2.0. Nos ha funcionado en Amazon EMR. Fuimos perfectamente capaces de leer datos de S3 en un marco de datos, procesarlos, crear una tabla a partir del resultado y leerlos con MicroStrategy. Sin embargo, la respuesta de Vinay también ha funcionado.

Alex
fuente
5
Alguien marcó esta respuesta como de baja calidad debido a su extensión y contenido. Para ser honesto, probablemente hubiera sido mejor como comentario. Supongo que ha estado funcionando durante dos años y algunas personas lo han encontrado útil, así que ¿sería bueno dejar las cosas como están?
serakfalcon
Estoy de acuerdo, comentar hubiera sido la mejor opción. Lección aprendida :-)
Alex
15

necesitas tener / crear un HiveContext

import org.apache.spark.sql.hive.HiveContext;

HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

Luego guarde directamente el marco de datos o seleccione las columnas para almacenar como tabla de colmena

df es marco de datos

df.write().mode("overwrite").saveAsTable("schemaName.tableName");

o

df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");

o

df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");

Los modos de guardado son Agregar / Ignorar / Sobrescribir / ErrorIfExists

Agregué aquí la definición de HiveContext de Spark Documentation,

Además del SQLContext básico, también puede crear un HiveContext, que proporciona un superconjunto de la funcionalidad proporcionada por el SQLContext básico. Las características adicionales incluyen la capacidad de escribir consultas utilizando el analizador de HiveQL más completo, acceso a UDF de Hive y la capacidad de leer datos de tablas de Hive. Para usar un HiveContext, no es necesario tener una configuración de Hive existente, y todas las fuentes de datos disponibles para un SQLContext todavía están disponibles. HiveContext solo se empaqueta por separado para evitar incluir todas las dependencias de Hive en la compilación predeterminada de Spark.


en Spark versión 1.6.2, el uso de "dbName.tableName" da este error:

org.apache.spark.sql.AnalysisException: no se permite especificar el nombre de la base de datos u otros calificadores para las tablas temporales. Si el nombre de la tabla tiene puntos (.), Cite el nombre de la tabla con comillas invertidas () .`

Anandkumar
fuente
Es el segundo comando: 'df.select (df.col ("col1"), df.col ("col2"), df.col ("col3")) .write (). Mode ("sobreescribir"). SaveAsTable ("schemaName.tableName"); ' ¿requiere que las columnas seleccionadas que desea sobrescribir ya existan en la tabla? Entonces, ¿tiene la tabla existente y solo sobrescribe las columnas existentes 1, 2, 3 con los nuevos datos de su df en Spark? ¿Eso se interpreta bien?
dieHellste
3
df.write().mode...necesita ser cambiado adf.write.mode...
usuario 923227
8

Guardar en Hive es solo una cuestión de usar el write()método de su SQLContext:

df.write.saveAsTable(tableName)

Ver https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameWriter.html#saveAsTable(java.lang.String)

Desde Spark 2.2: use DataSet en lugar de DataFrame.

Raktotpal Bordoloi
fuente
Parece que tengo un error que indica que el trabajo se canceló. Probé el siguiente código pyspark_df.write.mode ("sobreescribir"). SaveAsTable ("InjuryTab2")
Sade
¡Hola! ¿por qué esto? From Spark 2.2: use DataSet instead DataFrame.
onofricamila
3

Lamento escribir tarde a la publicación, pero no veo una respuesta aceptada.

df.write().saveAsTablearrojará AnalysisExceptiony no es compatible con la mesa HIVE.

¡Almacenar DF ​​como df.write().format("hive")debería ser el truco!

Sin embargo, si eso no funciona, siguiendo los comentarios y respuestas anteriores, esta es la mejor solución en mi opinión (aunque abierta a sugerencias).

El mejor enfoque es crear explícitamente la tabla HIVE (incluida la tabla PARTICIONADA),

def createHiveTable: Unit ={
spark.sql("CREATE TABLE $hive_table_name($fields) " +
  "PARTITIONED BY ($partition_column String) STORED AS $StorageType")
}

guardar DF ​​como tabla temporal,

df.createOrReplaceTempView("$tempTableName")

e inserte en la tabla PARTITIONED HIVE:

spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName")
spark.sql("select * from default.$hive_table_name").show(1000,false)

Por supuesto, la ÚLTIMA COLUMNA en DF será la COLUMNA DE PARTICIÓN así que cree la tabla HIVE en consecuencia!

¡Por favor comente si funciona! o no.


--ACTUALIZAR--

df.write()
  .partitionBy("$partition_column")
  .format("hive")
  .mode(SaveMode.append)
  .saveAsTable($new_table_name_to_be_created_in_hive)  //Table should not exist OR should be a PARTITIONED table in HIVE
Harshv
fuente
1

Aquí está la versión de PySpark para crear una tabla Hive desde un archivo de parquet. Es posible que haya generado archivos Parquet utilizando un esquema inferido y ahora desee enviar la definición a Hive metastore. También puede enviar la definición al sistema como AWS Glue o AWS Athena y no solo a la tienda de metadatos de Hive. Aquí estoy usando spark.sql para empujar / crear una tabla permanente.

   # Location where my parquet files are present.
    df = spark.read.parquet("s3://my-location/data/")
    cols = df.dtypes
    buf = []
    buf.append('CREATE EXTERNAL TABLE test123 (')
    keyanddatatypes =  df.dtypes
    sizeof = len(df.dtypes)
    print ("size----------",sizeof)
    count=1;
    for eachvalue in keyanddatatypes:
        print count,sizeof,eachvalue
        if count == sizeof:
            total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
        else:
            total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
        buf.append(total)
        count = count + 1

    buf.append(' )')
    buf.append(' STORED as parquet ')
    buf.append("LOCATION")
    buf.append("'")
    buf.append('s3://my-location/data/')
    buf.append("'")
    buf.append("'")
    ##partition by pt
    tabledef = ''.join(buf)

    print "---------print definition ---------"
    print tabledef
    ## create a table using spark.sql. Assuming you are using spark 2.1+
    spark.sql(tabledef);
kartik
fuente
1

Para las tablas externas de Hive, uso esta función en PySpark:

def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"):
    print("Saving result in {}.{}".format(database, table_name))
    output_schema = "," \
        .join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \
        .replace("StringType", "STRING") \
        .replace("IntegerType", "INT") \
        .replace("DateType", "DATE") \
        .replace("LongType", "INT") \
        .replace("TimestampType", "INT") \
        .replace("BooleanType", "BOOLEAN") \
        .replace("FloatType", "FLOAT")\
        .replace("DoubleType","FLOAT")
    output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema)

    sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name))

    query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \
        .format(database, table_name, output_schema, save_format, database, table_name)
    sparkSession.sql(query)
    dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)
Shadowtrooper
fuente
1

En mi caso esto funciona bien:

from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("DatabaseName")
df = spark.read.format("csv").option("Header",True).load("/user/csvlocation.csv")
df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table",<tablename>).save()

¡¡Hecho!!

Puede leer los datos, le permite dar como "Empleado"

hive.executeQuery("select * from Employee").show()

Para obtener más detalles, utilice esta URL: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive-read-write-operations.html

MD Rijwan
fuente
0

Si desea crear una tabla de colmena (que no existe) a partir de un marco de datos (algunas veces no se puede crear DataFrameWriter.saveAsTable). StructType.toDDLayudará a enumerar las columnas como una cadena.

val df = ...

val schemaStr = df.schema.toDDL # This gives the columns 
spark.sql(s"""create table hive_table ( ${schemaStr})""")

//Now write the dataframe to the table
df.write.saveAsTable("hive_table")

hive_tablese creará en el espacio predeterminado ya que no proporcionamos ninguna base de datos en spark.sql(). stg.hive_tablese puede utilizar para crear hive_tableen la stgbase de datos.

mrsrinivas
fuente
Aquí se encuentra un ejemplo detallado: stackoverflow.com/a/56833395/1592191
mrsrinivas
0

Podrías usar la biblioteca spark-llap de Hortonworks de esta manera

import com.hortonworks.hwc.HiveWarehouseSession

df.write
  .format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector")
  .mode("append")
  .option("table", "myDatabase.myTable")
  .save()
Miguel
fuente