Soy nuevo en Spark y estoy tratando de leer datos CSV de un archivo con Spark. Esto es lo que estoy haciendo:
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
Esperaría que esta llamada me diera una lista de las dos primeras columnas de mi archivo, pero recibo este error:
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
aunque mi archivo CSV tiene más de una columna.
python
csv
apache-spark
pyspark
Kernael
fuente
fuente
csv
biblioteca incorporada para manejar todos los escapes porque simplemente dividir por comas no funcionará si, digamos, hay comas en los valores.","
.Spark 2.0.0+
Puede utilizar la fuente de datos csv incorporada directamente:
o
sin incluir dependencias externas.
Chispa <2.0.0 :
En lugar del análisis manual, que está lejos de ser trivial en un caso general, recomendaría
spark-csv
:Asegúrese de que CSV Spark está incluido en la ruta de acceso (
--packages
,--jars
,--driver-class-path
)Y cargue sus datos de la siguiente manera:
Puede manejar la carga, la inferencia del esquema, la eliminación de líneas mal formadas y no requiere pasar datos de Python a la JVM.
Nota :
Si conoce el esquema, es mejor evitar la inferencia del esquema y pasarlo a
DataFrameReader
. Suponiendo que tiene tres columnas: entero, doble y cadena:fuente
pyspark --packages com.databricks:spark-csv_2.11:1.4.0
(asegúrese de cambiar las versiones de databricks / spark a las que ha instalado).fuente
Y otra opción más que consiste en leer el archivo CSV usando Pandas y luego importar el Pandas DataFrame a Spark.
Por ejemplo:
fuente
Simplemente dividir por comas también dividirá las comas que están dentro de los campos (por ejemplo
a,b,"1,2,3",c
), por lo que no se recomienda. La respuesta de zero323 es buena si desea utilizar la API de DataFrames, pero si desea ceñirse a la base Spark, puede analizar csvs en base Python con el módulo csv :EDITAR: Como @muon mencionó en los comentarios, esto tratará el encabezado como cualquier otra fila, por lo que deberá extraerlo manualmente. Por ejemplo,
header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(asegúrese de no modificarheader
antes de que se evalúe el filtro). Pero en este punto, probablemente sea mejor que utilice un analizador csv incorporado.fuente
StringIO
.csv
puede usar cualquier iterable b)__next__
no debe usarse directamente y fallará en la línea vacía. Eche un vistazo a flatMap c) Sería mucho más eficiente de usar enmapPartitions
lugar de inicializar el lector en cada línea :)rdd.mapPartitions(lambda x: csv.reader(x))
funciona mientrasrdd.map(lambda x: csv.reader(x))
arroja un error? Esperaba que ambos lanzaran lo mismoTypeError: can't pickle _csv.reader objects
. También parece quemapPartitions
llama automáticamente a algunos equivalentes a "readlines" en elcsv.reader
objeto, donde conmap
, necesitaba llamar__next__
explícitamente para sacar las listas decsv.reader
. 2) ¿DóndeflatMap
entra? LlamarmapPartitions
solo funcionó para mí.rdd.mapPartitions(lambda x: csv.reader(x))
funciona porquemapPartitions
espera unIterable
objeto. Si quieres ser explícito, puedes comprensión o expresión generadora.map
solo no funciona porque no itera sobre el objeto. De ahí mi sugerencia de uso,flatMap(lambda x: csv.reader([x]))
que iterará sobre el lector. PeromapPartitions
aquí es mucho mejor.Esto está en PYSPARK
Entonces puedes comprobar
fuente
Si desea cargar csv como un marco de datos, puede hacer lo siguiente:
Funcionó bien para mí.
fuente
Esto está en línea con lo que JP Mercier sugirió inicialmente sobre el uso de Pandas, pero con una modificación importante: si lee datos en Pandas en trozos, debería ser más maleable. Lo que significa que puede analizar un archivo mucho más grande de lo que Pandas realmente puede manejar como una sola pieza y pasarlo a Spark en tamaños más pequeños. (Esto también responde al comentario sobre por qué uno querría usar Spark si de todos modos pueden cargar todo en Pandas).
fuente
Ahora, también hay otra opción para cualquier archivo csv general: https://github.com/seahboonsiew/pyspark-csv de la siguiente manera:
Supongamos que tenemos el siguiente contexto
Primero, distribuya pyspark-csv.py a los ejecutores usando SparkContext
Leer datos csv a través de SparkContext y convertirlos a DataFrame
fuente
Si sus datos csv no contienen líneas nuevas en ninguno de los campos, puede cargar sus datos
textFile()
y analizarlosfuente
Si tiene una o más filas con menos o más columnas que 2 en el conjunto de datos, puede surgir este error.
También soy nuevo en Pyspark y estoy tratando de leer un archivo CSV. El siguiente código funcionó para mí:
En este código, estoy usando un conjunto de datos de kaggle, el enlace es: https://www.kaggle.com/carrie1/ecommerce-data
1. Sin mencionar el esquema:
Ahora verifique las columnas: sdfData.columns
La salida será:
Verifique el tipo de datos para cada columna:
Esto dará el marco de datos con todas las columnas con tipo de datos como StringType
2. Con esquema: si conoce el esquema o desea cambiar el tipo de datos de cualquier columna en la tabla anterior, use esto (digamos que tengo las siguientes columnas y las quiero en un tipo de datos particular para cada una de ellas)
Ahora verifique el esquema para el tipo de datos de cada columna:
Editado: También podemos usar la siguiente línea de código sin mencionar el esquema explícitamente:
La salida es:
La salida se verá así:
fuente
Cuando lo uso
spark.read.csv
, encuentro que usar las opcionesescape='"'
ymultiLine=True
brindar la solución más consistente para el estándar CSV , y en mi experiencia, funciona mejor con archivos CSV exportados desde Google Sheets.Es decir,
fuente
import pyspark as spark
?spark
ya está inicializado. En un script enviado porspark-submit
, puede crear una instancia comofrom pyspark.sql import SparkSession; spark = SparkSession.builder.getOrCreate()
.