Escriba un solo archivo CSV usando spark-csv

Respuestas:

168

Está creando una carpeta con varios archivos, porque cada partición se guarda individualmente. Si necesita un solo archivo de salida (aún en una carpeta), puede repartition(preferiblemente si los datos ascendentes son grandes, pero requieren una reproducción aleatoria):

df
   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

o bien coalesce:

df
   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

marco de datos antes de guardar:

Todos los datos se escribirán en mydata.csv/part-00000. Antes de usar esta opción, asegúrese de comprender lo que está sucediendo y cuál es el costo de transferir todos los datos a un solo trabajador . Si utiliza un sistema de archivos distribuido con replicación, los datos se transferirán varias veces: primero se recuperarán a un solo trabajador y luego se distribuirán entre los nodos de almacenamiento.

Como alternativa se puede dejar el código tal como es y utiliza herramientas de uso general como cato HDFSgetmerge simplemente fusionar todas las partes después.

zero323
fuente
6
también puede usar coalesce: df.coalesce (1) .write.format ("com.databricks.spark.csv") .option ("header", "true") .save ("mydata.csv")
ravi
Spark 1.6 arroja un error cuando lo configuramos .coalesce(1), dice alguna excepción FileNotFoundException en el directorio _temporary. Todavía es un error en Spark
Harsha
@Harsha Improbable. Más bien un simple resultado de coalesce(1)ser muy caro y generalmente no práctico.
zero323
De acuerdo @ zero323, pero si tiene un requisito especial para consolidar en un archivo, aún debería ser posible dado que tiene suficientes recursos y tiempo.
Harsha
2
@Harsha No digo que no lo haya. Si ajusta correctamente GC, debería funcionar bien, pero es simplemente una pérdida de tiempo y lo más probable es que afecte el rendimiento general. Así que, personalmente, no veo ninguna razón para molestarme, especialmente porque es trivialmente simple fusionar archivos fuera de Spark sin preocuparme por el uso de la memoria en absoluto.
zero323
36

Si está ejecutando Spark con HDFS, he estado resolviendo el problema escribiendo archivos csv normalmente y aprovechando HDFS para realizar la fusión. Estoy haciendo eso en Spark (1.6) directamente:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}


val newData = << create your dataframe >>


val outputfile = "/user/feeds/project/outputs/subject"  
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName

    newData.write
        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()

No recuerdo dónde aprendí este truco, pero podría funcionar para ti.

Minkymorgan
fuente
No lo he probado, y sospecho que puede que no sea sencillo.
Minkymorgan
1
Gracias. He añadido una respuesta que funciona en Databricks
Josías Yoder
@Minkymorgan tengo un problema similar, pero no es capaz de hacerlo correctamente ..Can que por favor vistazo a esta pregunta stackoverflow.com/questions/46812388/...
SUDARSHAN
4
@SUDARSHAN Mi función anterior funciona con datos sin comprimir. En su ejemplo, creo que está utilizando la compresión gzip mientras escribe archivos, y luego, tratando de fusionarlos, lo que falla. Eso no va a funcionar, ya que no se pueden combinar archivos gzip. Gzip no es un algoritmo de compresión divisible, por lo que ciertamente no es "fusionable". Puede probar la compresión "rápida" o "bz2", pero la intuición es que esto también fallará en la fusión. Probablemente lo mejor sea eliminar la compresión, fusionar archivos sin procesar y luego comprimir utilizando un códec divisible.
Minkymorgan
y ¿qué pasa si quiero conservar el encabezado? se duplica para cada parte del archivo
Normal
32

Puede que llegue un poco tarde al juego aquí, pero usar coalesce(1)o repartition(1)puede funcionar para conjuntos de datos pequeños, pero los conjuntos de datos grandes se colocarían todos en una partición en un nodo. Es probable que esto genere errores OOM o, en el mejor de los casos, que se procese lentamente.

Te sugiero que uses la FileUtil.copyMerge()función de la API de Hadoop. Esto fusionará las salidas en un solo archivo.

EDITAR : esto lleva efectivamente los datos al controlador en lugar de a un nodo ejecutor. Coalesce()Estaría bien si un solo ejecutor tiene más RAM para usar que el controlador.

EDIT 2 : copyMerge()se está eliminando en Hadoop 3.0. Consulte el siguiente artículo de desbordamiento de pila para obtener más información sobre cómo trabajar con la versión más reciente: ¿Cómo hacer CopyMerge en Hadoop 3.0?

etspaceman
fuente
¿Alguna idea sobre cómo obtener un csv con una fila de encabezado de esta manera? No me gustaría que el archivo produjera un encabezado, ya que eso intercalaría encabezados en todo el archivo, uno para cada partición.
nojo
Hay una opción que he usado en el pasado documentada aquí: markhneedham.com/blog/2014/11/30/…
etspaceman
@etspaceman Genial. Todavía no tengo una buena manera de hacer esto, desafortunadamente, ya que necesito poder hacer esto en Java (o Spark, pero de una manera que no consuma mucha memoria y pueda trabajar con archivos grandes) . Todavía no puedo creer que eliminaron esta llamada a la API ... este es un uso muy común, incluso si no lo usan exactamente otras aplicaciones en el ecosistema de Hadoop.
woot
20

Si está usando Databricks y puede colocar todos los datos en la RAM de un trabajador (y por lo tanto puede usar .coalesce(1)), puede usar dbfs para buscar y mover el archivo CSV resultante:

val fileprefix= "/mnt/aws/path/file-prefix"

dataset
  .coalesce(1)       
  .write             
//.mode("overwrite") // I usually don't use this, but you may want to.
  .option("header", "true")
  .option("delimiter","\t")
  .csv(fileprefix+".tmp")

val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
     .filter(file=>file.name.endsWith(".csv"))(0).path

dbutils.fs.cp(partition_path,fileprefix+".tab")

dbutils.fs.rm(fileprefix+".tmp",recurse=true)

Si su archivo no cabe en la RAM del trabajador, puede considerar la sugerencia de chaotic3quilibrium de usar FileUtils.copyMerge () . No he hecho esto y aún no sé si es posible o no, por ejemplo, en S3.

Esta respuesta se basa en respuestas anteriores a esta pregunta, así como en mis propias pruebas del fragmento de código proporcionado. Lo publiqué originalmente en Databricks y lo estoy volviendo a publicar aquí.

La mejor documentación para la opción recursiva de rm de dbfs que he encontrado está en un foro de Databricks .

Josiah Yoder
fuente
3

Una solución que funciona para S3 modificado de Minkymorgan.

Simplemente pase la ruta del directorio temporal particionado (con un nombre diferente al de la ruta final) como el srcPathúnico csv / txt final como destPath Especifique también deleteSourcesi desea eliminar el directorio original.

/**
* Merges multiple partitions of spark text file output into single file. 
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit =  {
  import org.apache.hadoop.fs.FileUtil
  import java.net.URI
  val config = spark.sparkContext.hadoopConfiguration
  val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
  FileUtil.copyMerge(
    fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
  )
}
John Zhu
fuente
La implementación de copyMerge enumera todos los archivos e itera sobre ellos, esto no es seguro en s3. si escribe sus archivos y luego los enumera, esto no garantiza que todos ellos se enumerarán. ver [esto | docs.aws.amazon.com/AmazonS3/latest/dev/…
LiranBo
3

chispas df.write() API de Spark creará múltiples archivos de piezas dentro de la ruta dada ... para forzar a Spark escribir solo un archivo de una sola pieza, el uso en df.coalesce(1).write.csv(...)lugar de df.repartition(1).write.csv(...)coalesce es una transformación estrecha, mientras que la repartición es una transformación amplia, consulte Spark - repartition () vs coalesce ()

df.coalesce(1).write.csv(filepath,header=True) 

creará una carpeta en la ruta de archivo dada con un solo part-0001-...-c000.csvuso de archivo

cat filepath/part-0001-...-c000.csv > filename_you_want.csv 

tener un nombre de archivo fácil de usar

pprasad009
fuente
alternativamente, si el marco de datos no es demasiado grande (~ GB o puede caber en la memoria del controlador), también puede usar df.toPandas().to_csv(path)esto escribirá un solo csv con su nombre de archivo preferido
pprasad009
1
Uf, es tan frustrante cómo esto solo se puede hacer convirtiéndose en pandas. ¿Qué tan difícil es escribir un archivo sin algún UUID en él?
ijoseph
2

repartición / fusión en 1 partición antes de guardar (aún obtendría una carpeta pero tendría un archivo de parte)

Arnon Rotem-Gal-Oz
fuente
2

puedes usar rdd.coalesce(1, true).saveAsTextFile(path)

almacenará los datos como un archivo individual en la ruta / part-00000

Gourav
fuente
1
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql.{DataFrame,SaveMode,SparkSession}
import org.apache.spark.sql.functions._

Lo resolví usando el siguiente enfoque (hdfs renombrar el nombre del archivo): -

Paso 1: - (Crate Data Frame y escriba en HDFS)

df.coalesce(1).write.format("csv").option("header", "false").mode(SaveMode.Overwrite).save("/hdfsfolder/blah/")

Paso 2: - (Crear configuración de Hadoop)

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)

Paso 3: - (Obtener ruta en la ruta de la carpeta hdfs)

val pathFiles = new Path("/hdfsfolder/blah/")

Paso 4: - (Obtenga los nombres de los archivos Spark de la carpeta hdfs)

val fileNames = hdfs.listFiles(pathFiles, false)
println(fileNames)

setp5: - (crea una lista mutable de Scala para guardar todos los nombres de archivo y agregarlo a la lista)

    var fileNamesList = scala.collection.mutable.MutableList[String]()
    while (fileNames.hasNext) {
      fileNamesList += fileNames.next().getPath.getName
    }
    println(fileNamesList)

Paso 6: - (filtrar el orden de archivos _SUCESS de la lista de nombres de archivos scala)

    // get files name which are not _SUCCESS
    val partFileName = fileNamesList.filterNot(filenames => filenames == "_SUCCESS")

paso 7: - (convierta la lista de scala en una cadena y agregue el nombre de archivo deseado a la cadena de la carpeta hdfs y luego aplique el cambio de nombre)

val partFileSourcePath = new Path("/yourhdfsfolder/"+ partFileName.mkString(""))
    val desiredCsvTargetPath = new Path(/yourhdfsfolder/+ "op_"+ ".csv")
    hdfs.rename(partFileSourcePath , desiredCsvTargetPath)
Sri Hari Kali Charan Tummala
fuente
1

Estoy usando esto en Python para obtener un solo archivo:

df.toPandas().to_csv("/tmp/my.csv", sep=',', header=True, index=False)
Kees C. Bakker
fuente
1

Esta respuesta amplía la respuesta aceptada, brinda más contexto y proporciona fragmentos de código que puede ejecutar en Spark Shell en su máquina.

Más contexto sobre la respuesta aceptada

La respuesta aceptada puede darle la impresión de que el código de muestra genera un solo mydata.csvarchivo y ese no es el caso. Demostremos:

val df = Seq("one", "two", "three").toDF("num")
df
  .repartition(1)
  .write.csv(sys.env("HOME")+ "/Documents/tmp/mydata.csv")

Esto es lo que se genera:

Documents/
  tmp/
    mydata.csv/
      _SUCCESS
      part-00000-b3700504-e58b-4552-880b-e7b52c60157e-c000.csv

NB mydata.csves una carpeta en la respuesta aceptada, ¡no es un archivo!

Cómo generar un solo archivo con un nombre específico

Podemos usar Spark-Daria para escribir un solo mydata.csvarchivo.

import com.github.mrpowers.spark.daria.sql.DariaWriters
DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = sys.env("HOME") + "/Documents/better/staging",
    filename = sys.env("HOME") + "/Documents/better/mydata.csv"
)

Esto generará el archivo de la siguiente manera:

Documents/
  better/
    mydata.csv

Rutas S3

Deberá pasar las rutas s3a DariaWriters.writeSingleFilepara usar este método en S3:

DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = "s3a://bucket/data/src",
    filename = "s3a://bucket/data/dest/my_cool_file.csv"
)

Ver aquí para obtener más información.

Evitar copyMerge

copyMerge se eliminó de Hadoop 3. La DariaWriters.writeSingleFileimplementación utiliza fs.rename, como se describe aquí . Spark 3 todavía usaba Hadoop 2 , por lo que las implementaciones de copyMerge funcionarán en 2020. No estoy seguro de cuándo Spark se actualizará a Hadoop 3, pero es mejor evitar cualquier enfoque de copyMerge que haga que su código se rompa cuando Spark actualice Hadoop.

Código fuente

Busque el DariaWritersobjeto en el código fuente de spark-daria si desea inspeccionar la implementación.

Implementación de PySpark

Es más fácil escribir un solo archivo con PySpark porque puede convertir el DataFrame en un Pandas DataFrame que se escribe como un solo archivo por defecto.

from pathlib import Path
home = str(Path.home())
data = [
    ("jellyfish", "JALYF"),
    ("li", "L"),
    ("luisa", "LAS"),
    (None, None)
]
df = spark.createDataFrame(data, ["word", "expected"])
df.toPandas().to_csv(home + "/Documents/tmp/mydata-from-pyspark.csv", sep=',', header=True, index=False)

Limitaciones

El DariaWriters.writeSingleFileenfoque Scala y el df.toPandas()enfoque Python solo funcionan para pequeños conjuntos de datos. Los conjuntos de datos enormes no se pueden escribir como archivos individuales. Escribir datos como un solo archivo no es óptimo desde la perspectiva del rendimiento porque los datos no se pueden escribir en paralelo.

Potestades
fuente
0

al usar Listbuffer podemos guardar datos en un solo archivo:

import java.io.FileWriter
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
    val text = spark.read.textFile("filepath")
    var data = ListBuffer[String]()
    for(line:String <- text.collect()){
      data += line
    }
    val writer = new FileWriter("filepath")
    data.foreach(line => writer.write(line.toString+"\n"))
    writer.close()
siddhu salvi
fuente
-2

Hay una forma más de usar Java

import java.io._

def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) 
  {
     val p = new java.io.PrintWriter(f);  
     try { op(p) } 
     finally { p.close() }
  } 

printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}
Sergio Alyoshkin
fuente
nombre 'verdadero' no está definido
Arron