Cómo sobrescribir el directorio de salida en Spark

107

Tengo una aplicación de transmisión de chispas que produce un conjunto de datos por cada minuto. Necesito guardar / sobrescribir los resultados de los datos procesados.

Cuando intenté sobrescribir el conjunto de datos org.apache.hadoop.mapred.FileAlreadyExistsException detiene la ejecución.

Configuré la propiedad Spark set("spark.files.overwrite","true"), pero no hubo suerte.

¿Cómo sobrescribir o eliminar previamente los archivos de Spark?

Vijay Innamuri
fuente
1
Sí, apesta, ¿no? Lo considero una regresión a 0.9.0. Acepte mi respuesta :)
samthebest
set("spark.files.overwrite","true")sólo funciona para archivos throught añadidospark.addFile()
aiman

Respuestas:

106

ACTUALIZACIÓN: Sugerir el uso Dataframes, más algo como ... .write.mode(SaveMode.Overwrite) ....

Proxeneta práctico:

implicit class PimpedStringRDD(rdd: RDD[String]) {
    def write(p: String)(implicit ss: SparkSession): Unit = {
      import ss.implicits._
      rdd.toDF().as[String].write.mode(SaveMode.Overwrite).text(p)
    }
  }

Para versiones anteriores, intente

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

En 1.1.0 puede establecer la configuración de conf utilizando el script spark-submit con el indicador --conf.

ADVERTENCIA (versiones anteriores): Según @piggybox, hay un error en Spark en el que solo sobrescribirá los archivos que necesita para escribir sus part-archivos, cualquier otro archivo se dejará sin eliminar.

samthebest
fuente
29
Para Spark 1.4:df.write.mode(SaveMode.Overwrite).parquet(path)
Ha Pham
Para Spark SQL, tiene opciones para definir SaveMode para Core Spark, no tiene nada de eso. Realmente me gustaría tener algo de ese tipo de función para saveAsTextFile y otras transformaciones
Murtaza Kanchwala
3
Un problema oculto: en comparación con la solución de @ pzecevic para borrar toda la carpeta a través de HDFS, en este enfoque, Spark solo sobrescribirá los archivos de pieza con el mismo nombre de archivo en la carpeta de salida. Esto funciona la mayor parte del tiempo, pero si hay algo más, como archivos de piezas adicionales de otro trabajo de Spark / Hadoop en la carpeta, esto no sobrescribirá estos archivos.
alcancía
6
También puede utilizar el df.write.mode(mode: String).parquet(path)modo Where: String puede ser: "sobrescribir", "añadir", "ignorar", "error".
centeno
1
@avocado Sí, lo creo, las API de Spark empeoran cada vez más en cada lanzamiento: P
samthebest
27

La documentación del parámetro spark.files.overwritedice lo siguiente: "Si se sobrescriben los archivos agregados SparkContext.addFile()cuando el archivo de destino existe y su contenido no coincide con el de la fuente". Por lo tanto, no tiene ningún efecto en el método saveAsTextFiles.

Puede hacer esto antes de guardar el archivo:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

Como se explica aquí: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html

pzecevic
fuente
29
¿qué pasa con pyspark?
javadba
La siguiente respuesta para usar 'write.mode (SaveMode.Overwrite)' es el camino a seguir
YaOg
hdfs puede eliminar los archivos nuevos a medida que ingresan, ya que aún está eliminando los antiguos.
Jake
25

De la documentación de pyspark.sql.DataFrame.save (actualmente en 1.3.1), puede especificar mode='overwrite'al guardar un DataFrame:

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

He verificado que esto incluso eliminará los archivos de partición sobrantes. Entonces, si originalmente dijiste 10 particiones / archivos, pero luego sobrescribiste la carpeta con un DataFrame que solo tenía 6 particiones, la carpeta resultante tendrá las 6 particiones / archivos.

Consulte la documentación de Spark SQL para obtener más información sobre las opciones de modo.

dnlbrky
fuente
2
Verdadero y útil, gracias, pero una solución específica de DataFrame spark.hadoop.validateOutputSpecsfuncionará en todas las API de Spark.
samthebest
Por alguna razón, spark.hadoop.validateOutputSpecsno funcionó para mí en 1.3, pero esto funciona.
Eric Walker
1
@samthebest Con la save(... , mode=ruta, puede sobrescribir un conjunto de archivos, agregar otro, etc. dentro del mismo contexto de Spark. ¿No spark.hadoop.validateOutputSpecste limitaría a un solo modo por contexto?
dnlbrky
1
@dnlbrky El OP no solicitó agregar. Como dije, cierto, útil, pero innecesario. Si el OP preguntaba "cómo agrego", entonces se podría dar una amplia gama de respuestas. Pero no entremos en eso. También le aconsejo que considere usar la versión Scala de DataFrames, ya que tiene seguridad de tipos y más verificación; por ejemplo, si tuviera un error tipográfico en "sobrescribir", no lo sabría hasta que se evaluó el DAG, lo que en un trabajo de Big Data podría ser 2 horas despues !! Si usa la versión de Scala, el compilador comprobará todo desde el principio. Bastante genial y muy importante para Big Data.
samthebest
15

df.write.mode('overwrite').parquet("/output/folder/path")funciona si desea sobrescribir un archivo de parquet usando python. Esto está en Spark 1.6.2. La API puede ser diferente en versiones posteriores

akn
fuente
Sí, esto funciona muy bien para mi requisito (Databricks)
Nick.McDermaid
4
  val jobName = "WordCount";
  //overwrite the output directory in spark  set("spark.hadoop.validateOutputSpecs", "false")
  val conf = new 
  SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
  val sc = new SparkContext(conf)
vaquar khan
fuente
Solo para Spark 1, en la última versión, usodf.write.mode(SaveMode.Overwrite)
ChikuMiku
3

Esta versión sobrecargada de la función de guardar funciona para mí:

yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Overwrite"))

El ejemplo anterior sobrescribiría una carpeta existente. El modo de guardado también puede tomar estos parámetros ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):

Agregar : el modo de agregar significa que al guardar un DataFrame en una fuente de datos, si los datos / tabla ya existen, se espera que el contenido del DataFrame se agregue a los datos existentes.

ErrorIfExists : El modo ErrorIfExists significa que al guardar un DataFrame en una fuente de datos, si los datos ya existen, se espera que se lance una excepción.

Ignorar : el modo Ignorar significa que al guardar un DataFrame en una fuente de datos, si los datos ya existen, se espera que la operación de guardado no guarde el contenido del DataFrame y no cambie los datos existentes.

Shay
fuente
1

Si está dispuesto a utilizar su propio formato de salida personalizado, también podrá obtener el comportamiento deseado con RDD.

Eche un vistazo a las siguientes clases: FileOutputFormat , FileOutputCommitter

En formato de salida de archivo, tiene un método llamado checkOutputSpecs, que verifica si existe el directorio de salida. En FileOutputCommitter tiene el commitJob que generalmente transfiere datos desde el directorio temporal a su lugar final.

No pude verificarlo todavía (lo haría, tan pronto como tenga unos minutos libres) pero teóricamente: si extiendo FileOutputFormat y anulo checkOutputSpecs a un método que no arroja una excepción en el directorio ya existe, y ajusto el El método commitJob de mi committer de salida personalizado para realizar cualquier lógica que desee (por ejemplo, anular algunos de los archivos, agregar otros), de lo que también puedo lograr el comportamiento deseado con los RDD.

El formato de salida se pasa a: saveAsNewAPIHadoopFile (que es el método llamado saveAsTextFile para guardar los archivos). Y el confirmador de salida se configura en el nivel de la aplicación.

Michael Kopaniov
fuente
Evitaría acercarme a subclasificar FileOutputCommitter si puedes evitarlo: ese es un código aterrador. Hadoop 3.0 agrega un punto de complemento donde FileOutputFormat puede tomar diferentes implementaciones de una superclase refactorizada (PathOutputCommitter). El S3 de Netflix se escribirá en el lugar en un árbol particionado, solo resolverá conflictos (fallará, eliminará, agregará) en la confirmación del trabajo, y solo en las particiones actualizadas
stevel