Está creando una carpeta con varios archivos, porque cada partición se guarda individualmente. Si necesita un solo archivo de salida (aún en una carpeta), puede repartition
(preferiblemente si los datos ascendentes son grandes, pero requieren una reproducción aleatoria):
df
.repartition(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
o bien coalesce
:
df
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
marco de datos antes de guardar:
Todos los datos se escribirán en mydata.csv/part-00000
. Antes de usar esta opción, asegúrese de comprender lo que está sucediendo y cuál es el costo de transferir todos los datos a un solo trabajador . Si utiliza un sistema de archivos distribuido con replicación, los datos se transferirán varias veces: primero se recuperarán a un solo trabajador y luego se distribuirán entre los nodos de almacenamiento.
Como alternativa se puede dejar el código tal como es y utiliza herramientas de uso general como cat
o HDFSgetmerge
simplemente fusionar todas las partes después.
.coalesce(1)
, dice alguna excepción FileNotFoundException en el directorio _temporary. Todavía es un error en Sparkcoalesce(1)
ser muy caro y generalmente no práctico.Si está ejecutando Spark con HDFS, he estado resolviendo el problema escribiendo archivos csv normalmente y aprovechando HDFS para realizar la fusión. Estoy haciendo eso en Spark (1.6) directamente:
No recuerdo dónde aprendí este truco, pero podría funcionar para ti.
fuente
Puede que llegue un poco tarde al juego aquí, pero usar
coalesce(1)
orepartition(1)
puede funcionar para conjuntos de datos pequeños, pero los conjuntos de datos grandes se colocarían todos en una partición en un nodo. Es probable que esto genere errores OOM o, en el mejor de los casos, que se procese lentamente.Te sugiero que uses la
FileUtil.copyMerge()
función de la API de Hadoop. Esto fusionará las salidas en un solo archivo.EDITAR : esto lleva efectivamente los datos al controlador en lugar de a un nodo ejecutor.
Coalesce()
Estaría bien si un solo ejecutor tiene más RAM para usar que el controlador.EDIT 2 :
copyMerge()
se está eliminando en Hadoop 3.0. Consulte el siguiente artículo de desbordamiento de pila para obtener más información sobre cómo trabajar con la versión más reciente: ¿Cómo hacer CopyMerge en Hadoop 3.0?fuente
Si está usando Databricks y puede colocar todos los datos en la RAM de un trabajador (y por lo tanto puede usar
.coalesce(1)
), puede usar dbfs para buscar y mover el archivo CSV resultante:Si su archivo no cabe en la RAM del trabajador, puede considerar la sugerencia de chaotic3quilibrium de usar FileUtils.copyMerge () . No he hecho esto y aún no sé si es posible o no, por ejemplo, en S3.
Esta respuesta se basa en respuestas anteriores a esta pregunta, así como en mis propias pruebas del fragmento de código proporcionado. Lo publiqué originalmente en Databricks y lo estoy volviendo a publicar aquí.
La mejor documentación para la opción recursiva de rm de dbfs que he encontrado está en un foro de Databricks .
fuente
Una solución que funciona para S3 modificado de Minkymorgan.
Simplemente pase la ruta del directorio temporal particionado (con un nombre diferente al de la ruta final) como el
srcPath
único csv / txt final comodestPath
Especifique tambiéndeleteSource
si desea eliminar el directorio original.fuente
chispas
df.write()
API de Spark creará múltiples archivos de piezas dentro de la ruta dada ... para forzar a Spark escribir solo un archivo de una sola pieza, el uso endf.coalesce(1).write.csv(...)
lugar dedf.repartition(1).write.csv(...)
coalesce es una transformación estrecha, mientras que la repartición es una transformación amplia, consulte Spark - repartition () vs coalesce ()creará una carpeta en la ruta de archivo dada con un solo
part-0001-...-c000.csv
uso de archivotener un nombre de archivo fácil de usar
fuente
df.toPandas().to_csv(path)
esto escribirá un solo csv con su nombre de archivo preferidorepartición / fusión en 1 partición antes de guardar (aún obtendría una carpeta pero tendría un archivo de parte)
fuente
puedes usar
rdd.coalesce(1, true).saveAsTextFile(path)
almacenará los datos como un archivo individual en la ruta / part-00000
fuente
Lo resolví usando el siguiente enfoque (hdfs renombrar el nombre del archivo): -
Paso 1: - (Crate Data Frame y escriba en HDFS)
Paso 2: - (Crear configuración de Hadoop)
Paso 3: - (Obtener ruta en la ruta de la carpeta hdfs)
Paso 4: - (Obtenga los nombres de los archivos Spark de la carpeta hdfs)
setp5: - (crea una lista mutable de Scala para guardar todos los nombres de archivo y agregarlo a la lista)
Paso 6: - (filtrar el orden de archivos _SUCESS de la lista de nombres de archivos scala)
paso 7: - (convierta la lista de scala en una cadena y agregue el nombre de archivo deseado a la cadena de la carpeta hdfs y luego aplique el cambio de nombre)
fuente
Estoy usando esto en Python para obtener un solo archivo:
fuente
Esta respuesta amplía la respuesta aceptada, brinda más contexto y proporciona fragmentos de código que puede ejecutar en Spark Shell en su máquina.
Más contexto sobre la respuesta aceptada
La respuesta aceptada puede darle la impresión de que el código de muestra genera un solo
mydata.csv
archivo y ese no es el caso. Demostremos:Esto es lo que se genera:
NB
mydata.csv
es una carpeta en la respuesta aceptada, ¡no es un archivo!Cómo generar un solo archivo con un nombre específico
Podemos usar Spark-Daria para escribir un solo
mydata.csv
archivo.Esto generará el archivo de la siguiente manera:
Rutas S3
Deberá pasar las rutas s3a
DariaWriters.writeSingleFile
para usar este método en S3:Ver aquí para obtener más información.
Evitar copyMerge
copyMerge se eliminó de Hadoop 3. La
DariaWriters.writeSingleFile
implementación utilizafs.rename
, como se describe aquí . Spark 3 todavía usaba Hadoop 2 , por lo que las implementaciones de copyMerge funcionarán en 2020. No estoy seguro de cuándo Spark se actualizará a Hadoop 3, pero es mejor evitar cualquier enfoque de copyMerge que haga que su código se rompa cuando Spark actualice Hadoop.Código fuente
Busque el
DariaWriters
objeto en el código fuente de spark-daria si desea inspeccionar la implementación.Implementación de PySpark
Es más fácil escribir un solo archivo con PySpark porque puede convertir el DataFrame en un Pandas DataFrame que se escribe como un solo archivo por defecto.
Limitaciones
El
DariaWriters.writeSingleFile
enfoque Scala y eldf.toPandas()
enfoque Python solo funcionan para pequeños conjuntos de datos. Los conjuntos de datos enormes no se pueden escribir como archivos individuales. Escribir datos como un solo archivo no es óptimo desde la perspectiva del rendimiento porque los datos no se pueden escribir en paralelo.fuente
al usar Listbuffer podemos guardar datos en un solo archivo:
fuente
Hay una forma más de usar Java
fuente