Tengo una aplicación de transmisión de chispas que produce un conjunto de datos por cada minuto. Necesito guardar / sobrescribir los resultados de los datos procesados.
Cuando intenté sobrescribir el conjunto de datos org.apache.hadoop.mapred.FileAlreadyExistsException detiene la ejecución.
Configuré la propiedad Spark set("spark.files.overwrite","true")
, pero no hubo suerte.
¿Cómo sobrescribir o eliminar previamente los archivos de Spark?
apache-spark
Vijay Innamuri
fuente
fuente
set("spark.files.overwrite","true")
sólo funciona para archivos throught añadidospark.addFile()
Respuestas:
ACTUALIZACIÓN: Sugerir el uso
Dataframes
, más algo como... .write.mode(SaveMode.Overwrite) ...
.Proxeneta práctico:
Para versiones anteriores, intente
En 1.1.0 puede establecer la configuración de conf utilizando el script spark-submit con el indicador --conf.
ADVERTENCIA (versiones anteriores): Según @piggybox, hay un error en Spark en el que solo sobrescribirá los archivos que necesita para escribir sus
part-
archivos, cualquier otro archivo se dejará sin eliminar.fuente
Spark 1.4
:df.write.mode(SaveMode.Overwrite).parquet(path)
df.write.mode(mode: String).parquet(path)
modo Where: String puede ser: "sobrescribir", "añadir", "ignorar", "error".ya que
df.save(path, source, mode)
está en desuso, ( http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame )usar
df.write.format(source).mode("overwrite").save(path)
donde df.write es DataFrameWriter
'fuente' puede ser ("com.databricks.spark.avro" | "parquet" | "json")
fuente
source
también puede sercsv
La documentación del parámetro
spark.files.overwrite
dice lo siguiente: "Si se sobrescriben los archivos agregadosSparkContext.addFile()
cuando el archivo de destino existe y su contenido no coincide con el de la fuente". Por lo tanto, no tiene ningún efecto en el método saveAsTextFiles.Puede hacer esto antes de guardar el archivo:
Como se explica aquí: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html
fuente
De la documentación de pyspark.sql.DataFrame.save (actualmente en 1.3.1), puede especificar
mode='overwrite'
al guardar un DataFrame:He verificado que esto incluso eliminará los archivos de partición sobrantes. Entonces, si originalmente dijiste 10 particiones / archivos, pero luego sobrescribiste la carpeta con un DataFrame que solo tenía 6 particiones, la carpeta resultante tendrá las 6 particiones / archivos.
Consulte la documentación de Spark SQL para obtener más información sobre las opciones de modo.
fuente
spark.hadoop.validateOutputSpecs
funcionará en todas las API de Spark.spark.hadoop.validateOutputSpecs
no funcionó para mí en 1.3, pero esto funciona.save(... , mode=
ruta, puede sobrescribir un conjunto de archivos, agregar otro, etc. dentro del mismo contexto de Spark. ¿Nospark.hadoop.validateOutputSpecs
te limitaría a un solo modo por contexto?df.write.mode('overwrite').parquet("/output/folder/path")
funciona si desea sobrescribir un archivo de parquet usando python. Esto está en Spark 1.6.2. La API puede ser diferente en versiones posterioresfuente
fuente
df.write.mode(SaveMode.Overwrite)
Esta versión sobrecargada de la función de guardar funciona para mí:
yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Overwrite"))
El ejemplo anterior sobrescribiría una carpeta existente. El modo de guardado también puede tomar estos parámetros ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):
Agregar : el modo de agregar significa que al guardar un DataFrame en una fuente de datos, si los datos / tabla ya existen, se espera que el contenido del DataFrame se agregue a los datos existentes.
ErrorIfExists : El modo ErrorIfExists significa que al guardar un DataFrame en una fuente de datos, si los datos ya existen, se espera que se lance una excepción.
Ignorar : el modo Ignorar significa que al guardar un DataFrame en una fuente de datos, si los datos ya existen, se espera que la operación de guardado no guarde el contenido del DataFrame y no cambie los datos existentes.
fuente
Si está dispuesto a utilizar su propio formato de salida personalizado, también podrá obtener el comportamiento deseado con RDD.
Eche un vistazo a las siguientes clases: FileOutputFormat , FileOutputCommitter
En formato de salida de archivo, tiene un método llamado checkOutputSpecs, que verifica si existe el directorio de salida. En FileOutputCommitter tiene el commitJob que generalmente transfiere datos desde el directorio temporal a su lugar final.
No pude verificarlo todavía (lo haría, tan pronto como tenga unos minutos libres) pero teóricamente: si extiendo FileOutputFormat y anulo checkOutputSpecs a un método que no arroja una excepción en el directorio ya existe, y ajusto el El método commitJob de mi committer de salida personalizado para realizar cualquier lógica que desee (por ejemplo, anular algunos de los archivos, agregar otros), de lo que también puedo lograr el comportamiento deseado con los RDD.
El formato de salida se pasa a: saveAsNewAPIHadoopFile (que es el método llamado saveAsTextFile para guardar los archivos). Y el confirmador de salida se configura en el nivel de la aplicación.
fuente