¿Es posible guardar DataFrame
en Spark directamente en Hive?
He tratado con la conversión DataFrame
de Rdd
a continuación, guardar como archivo de texto y luego cargar en la colmena. Pero me pregunto si puedo guardar directamente dataframe
en colmena.
scala
apache-spark
hive
apache-spark-sql
Gourav
fuente
fuente
temporary
mesa con lahive
mesa? Al hacerloshow tables
, solo incluye lashive
tablas para mispark 2.3.0
instalaciónUtilice
DataFrameWriter.saveAsTable
. (df.write.saveAsTable(...)
) Consulte la Guía de Spark SQL y DataFrame .fuente
df.write().saveAsTable(tableName)
también escribirá datos de transmisión en la tabla?No veo
df.write.saveAsTable(...)
obsoleto en la documentación de Spark 2.0. Nos ha funcionado en Amazon EMR. Fuimos perfectamente capaces de leer datos de S3 en un marco de datos, procesarlos, crear una tabla a partir del resultado y leerlos con MicroStrategy. Sin embargo, la respuesta de Vinay también ha funcionado.fuente
necesitas tener / crear un HiveContext
import org.apache.spark.sql.hive.HiveContext; HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
Luego guarde directamente el marco de datos o seleccione las columnas para almacenar como tabla de colmena
df es marco de datos
df.write().mode("overwrite").saveAsTable("schemaName.tableName");
o
df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");
o
df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");
Los modos de guardado son Agregar / Ignorar / Sobrescribir / ErrorIfExists
Agregué aquí la definición de HiveContext de Spark Documentation,
Además del SQLContext básico, también puede crear un HiveContext, que proporciona un superconjunto de la funcionalidad proporcionada por el SQLContext básico. Las características adicionales incluyen la capacidad de escribir consultas utilizando el analizador de HiveQL más completo, acceso a UDF de Hive y la capacidad de leer datos de tablas de Hive. Para usar un HiveContext, no es necesario tener una configuración de Hive existente, y todas las fuentes de datos disponibles para un SQLContext todavía están disponibles. HiveContext solo se empaqueta por separado para evitar incluir todas las dependencias de Hive en la compilación predeterminada de Spark.
en Spark versión 1.6.2, el uso de "dbName.tableName" da este error:
fuente
df.write().mode...
necesita ser cambiado adf.write.mode...
Guardar en Hive es solo una cuestión de usar el
write()
método de su SQLContext:Ver https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameWriter.html#saveAsTable(java.lang.String)
Desde Spark 2.2: use DataSet en lugar de DataFrame.
fuente
From Spark 2.2: use DataSet instead DataFrame.
Lamento escribir tarde a la publicación, pero no veo una respuesta aceptada.
df.write().saveAsTable
arrojaráAnalysisException
y no es compatible con la mesa HIVE.¡Almacenar DF como
df.write().format("hive")
debería ser el truco!Sin embargo, si eso no funciona, siguiendo los comentarios y respuestas anteriores, esta es la mejor solución en mi opinión (aunque abierta a sugerencias).
El mejor enfoque es crear explícitamente la tabla HIVE (incluida la tabla PARTICIONADA),
def createHiveTable: Unit ={ spark.sql("CREATE TABLE $hive_table_name($fields) " + "PARTITIONED BY ($partition_column String) STORED AS $StorageType") }
guardar DF como tabla temporal,
df.createOrReplaceTempView("$tempTableName")
e inserte en la tabla PARTITIONED HIVE:
spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName") spark.sql("select * from default.$hive_table_name").show(1000,false)
Por supuesto, la ÚLTIMA COLUMNA en DF será la COLUMNA DE PARTICIÓN así que cree la tabla HIVE en consecuencia!
¡Por favor comente si funciona! o no.
--ACTUALIZAR--
df.write() .partitionBy("$partition_column") .format("hive") .mode(SaveMode.append) .saveAsTable($new_table_name_to_be_created_in_hive) //Table should not exist OR should be a PARTITIONED table in HIVE
fuente
Aquí está la versión de PySpark para crear una tabla Hive desde un archivo de parquet. Es posible que haya generado archivos Parquet utilizando un esquema inferido y ahora desee enviar la definición a Hive metastore. También puede enviar la definición al sistema como AWS Glue o AWS Athena y no solo a la tienda de metadatos de Hive. Aquí estoy usando spark.sql para empujar / crear una tabla permanente.
# Location where my parquet files are present. df = spark.read.parquet("s3://my-location/data/") cols = df.dtypes buf = [] buf.append('CREATE EXTERNAL TABLE test123 (') keyanddatatypes = df.dtypes sizeof = len(df.dtypes) print ("size----------",sizeof) count=1; for eachvalue in keyanddatatypes: print count,sizeof,eachvalue if count == sizeof: total = str(eachvalue[0])+str(' ')+str(eachvalue[1]) else: total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',') buf.append(total) count = count + 1 buf.append(' )') buf.append(' STORED as parquet ') buf.append("LOCATION") buf.append("'") buf.append('s3://my-location/data/') buf.append("'") buf.append("'") ##partition by pt tabledef = ''.join(buf) print "---------print definition ---------" print tabledef ## create a table using spark.sql. Assuming you are using spark 2.1+ spark.sql(tabledef);
fuente
Para las tablas externas de Hive, uso esta función en PySpark:
def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"): print("Saving result in {}.{}".format(database, table_name)) output_schema = "," \ .join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \ .replace("StringType", "STRING") \ .replace("IntegerType", "INT") \ .replace("DateType", "DATE") \ .replace("LongType", "INT") \ .replace("TimestampType", "INT") \ .replace("BooleanType", "BOOLEAN") \ .replace("FloatType", "FLOAT")\ .replace("DoubleType","FLOAT") output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema) sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name)) query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \ .format(database, table_name, output_schema, save_format, database, table_name) sparkSession.sql(query) dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)
fuente
En mi caso esto funciona bien:
from pyspark_llap import HiveWarehouseSession hive = HiveWarehouseSession.session(spark).build() hive.setDatabase("DatabaseName") df = spark.read.format("csv").option("Header",True).load("/user/csvlocation.csv") df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table",<tablename>).save()
¡¡Hecho!!
Puede leer los datos, le permite dar como "Empleado"
hive.executeQuery("select * from Employee").show()
Para obtener más detalles, utilice esta URL: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive-read-write-operations.html
fuente
val df = ... val schemaStr = df.schema.toDDL # This gives the columns spark.sql(s"""create table hive_table ( ${schemaStr})""") //Now write the dataframe to the table df.write.saveAsTable("hive_table")
hive_table
se creará en el espacio predeterminado ya que no proporcionamos ninguna base de datos enspark.sql()
.stg.hive_table
se puede utilizar para crearhive_table
en lastg
base de datos.fuente
Podrías usar la biblioteca spark-llap de Hortonworks de esta manera
import com.hortonworks.hwc.HiveWarehouseSession df.write .format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector") .mode("append") .option("table", "myDatabase.myTable") .save()
fuente