Me gustaría leer un CSV en chispa y convertirlo como DataFrame y almacenarlo en HDFS con df.registerTempTable("table_name")
Yo he tratado:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Error que obtuve:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
¿Cuál es el comando correcto para cargar el archivo CSV como DataFrame en Apache Spark?
Respuestas:
spark-csv es parte de la funcionalidad central de Spark y no requiere una biblioteca separada. Entonces podrías hacer por ejemplo
En scala, (esto funciona para cualquier delimitador de formato en mención "," para csv, "\ t" para tsv, etc.)
val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")
fuente
Analiza CSV y carga como DataFrame / DataSet con Spark 2.x
Primero, inicialice el
SparkSession
objeto por defecto, estará disponible en shells comospark
1. Hazlo de manera programática
Actualización: Agregar todas las opciones desde aquí en caso de que el enlace se rompa en el futuro
2. También puedes hacer esta manera SQL
Dependencias :
Versión Spark <2.0
Dependencias:
fuente
spark-core_2.11
yspark-sql_2.11
de2.0.1
versión está bien. Si es posible, agregue el mensaje de error.spark.read.format("csv").option("delimiter ", "|") ...
programmatic way
es dejar fuera.format("csv")
y reemplazar.load(...
con.csv(...
. Eloption
método pertenece a la clase DataFrameReader que devuelve elread
método, donde los métodosload
ycsv
devuelven un marco de datos, por lo que no se pueden etiquetar las opciones después de que se invocan. Esta respuesta es bastante exhaustiva, pero debe vincular a la documentación para que las personas puedan ver todas las otras opciones de CSV disponibles spark.apache.org/docs/latest/api/scala/… *): org.apache.spark.sql.DataFrameEs para cuyo Hadoop es 2.6 y Spark es 1.6 y sin el paquete "databricks".
fuente
Con Spark 2.0, lo siguiente es cómo puedes leer CSV
fuente
spark.read.csv(path)
yspark.read.format("csv").load(path)
?En Java 1.8, este fragmento de código funciona perfectamente para leer archivos CSV
POM.xml
Java
fuente
Hay muchos desafíos para analizar un archivo CSV, se sigue sumando si el tamaño del archivo es mayor, si hay caracteres que no están en inglés / escape / separador / otros en los valores de la columna, que podrían causar errores de análisis.
La magia está en las opciones que se usan. Los que funcionaron para mí y espero que cubran la mayoría de los casos límite están en el siguiente código:
Espero que ayude. Para obtener más información, consulte: Uso de PySpark 2 para leer CSV con código fuente HTML
Nota: El código anterior es de la API de Spark 2, donde la API de lectura de archivos CSV viene incluida con paquetes integrados de Spark instalables.
Nota: PySpark es un contenedor de Python para Spark y comparte la misma API que Scala / Java.
fuente
El ejemplo de Penny's Spark 2 es la forma de hacerlo en spark2. Hay un truco más: tiene que encabezado generado por usted al hacer una exploración inicial de los datos, mediante el establecimiento de la opción
inferSchema
detrue
Aquí, entonces, suponiendo que se
spark
trata de una sesión de chispa que ha configurado, es la operación para cargar en el archivo de índice CSV de todas las imágenes de Landsat que aloja Amazon en S3.La mala noticia es: esto desencadena un escaneo a través del archivo; para algo grande como este archivo CSV comprimido de más de 20 MB, que puede demorar 30 segundos en una conexión de larga distancia. Tenga esto en cuenta: es mejor que codifique manualmente el esquema una vez que lo haya introducido.
(fragmento de código Licencia de software Apache 2.0 con licencia para evitar toda ambigüedad; algo que he hecho como prueba de demostración / integración de integración S3)
fuente
En caso de que esté construyendo un jar con scala 2.11 y Apache 2.0 o superior.
No hay necesidad de crear una
sqlContext
osparkContext
objetar. Solo unSparkSession
objeto es suficiente para todas las necesidades.Lo siguiente es mycode que funciona bien:
En caso de que esté ejecutando en un clúster, simplemente cambie
.master("local")
a.master("yarn")
mientras define elsparkBuilder
objetoEl Spark Doc cubre esto: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
fuente
Agregue las siguientes dependencias de Spark al archivo POM:
// Configuración de chispa:
val spark = SparkSession.builder (). master ("local"). appName ("Aplicación de muestra"). getOrCreate ()
// Leer archivo csv:
val df = spark.read.option ("encabezado", "verdadero"). csv ("FILE_PATH")
// Mostrar salida
df.show ()
fuente
Para leer desde la ruta relativa en el sistema, use el método System.getProperty para obtener el directorio actual y otros usos para cargar el archivo usando la ruta relativa.
chispa: 2.4.4 escala: 2.11.12
fuente
Con Spark 2.4+, si desea cargar un csv desde un directorio local, puede usar 2 sesiones y cargarlo en la colmena. La primera sesión debe crearse con la configuración master () como "local [*]" y la segunda sesión con "yarn" y Hive habilitados.
El de abajo funcionó para mí.
Cuando se ejecutó
spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar
, salió bien y creó la mesa en la colmena.fuente
El formato de archivo predeterminado es Parquet con spark.read .. y la lectura de archivos csv es la razón por la que obtiene la excepción. Especifique el formato csv con la api que está intentando usar
fuente
Prueba esto si usas spark 2.0+
Nota: este trabajo para cualquier archivo delimitado. Simplemente use la opción ("delimitador",) para cambiar el valor.
Espero que esto sea útil.
fuente
Con Spark csv incorporado, puede hacerlo fácilmente con el nuevo objeto SparkSession para Spark> 2.0.
Hay varias opciones que puede configurar.
header
: si su archivo incluye una línea de encabezado en la parte superiorinferSchema
: si desea inferir el esquema automáticamente o no. Por defecto estrue
. Siempre prefiero proporcionar un esquema para garantizar los tipos de datos adecuados.mode
: modo de análisis, PERMISO, DROPMALFORMED o FAILFASTdelimiter
: para especificar el delimitador, el valor predeterminado es una coma (',')fuente