Quiero leer un montón de archivos de texto desde una ubicación hdfs y realizar un mapeo en él en una iteración usando spark.
JavaRDD<String> records = ctx.textFile(args[1], 1);
es capaz de leer solo un archivo a la vez.
Quiero leer más de un archivo y procesarlos como un único RDD. ¿Cómo?
apache-spark
usuario3705662
fuente
fuente
Path
que se aplican las mismas opciones.sc.wholeTextFiles
es útil para datos que no están delimitados por líneassc.textFile(multipleCommaSeparatedDirs,320)
que conduce a19430
las tareas totales en lugar de320
... se comporta comounion
que también conduce a una locura número de tareas de muy baja paralelismowholeTextFiles
. ¿Cuál es su caso de uso? Puedo pensar en una solución alternativa siempre que use la misma cantidad de particiones que archivos ...Use de la
union
siguiente manera:Entonces el
bigRdd
es el RDD con todos los archivos.fuente
Puede usar una sola llamada textFile para leer varios archivos. Scala:
fuente
sc.textFile(files.mkString(","))
Puedes usar esto
Primero puede obtener un Buffer / Lista de rutas S3:
Ahora pase este objeto List al siguiente fragmento de código, tenga en cuenta: sc es un objeto de SQLContext
Ahora tienes un RDD unificado final, es decir, df
Opcional, y también puede repartirlo en un solo BigRDD
Reparticionar siempre funciona: D
fuente
En PySpark, he encontrado una forma útil adicional de analizar archivos. Quizás haya un equivalente en Scala, pero no me siento lo suficientemente cómodo como para encontrar una traducción que funcione. Es, en efecto, una llamada textFile con la adición de etiquetas (en el ejemplo a continuación, la clave = nombre de archivo, valor = 1 línea del archivo).
Archivo de texto "etiquetado"
entrada:
salida: matriz con cada entrada que contiene una tupla usando filename-as-key y con value = cada línea de archivo. (Técnicamente, con este método también puede usar una clave diferente además del nombre real de la ruta del archivo, tal vez una representación hash para guardar en la memoria). es decir.
También puede recombinarse como una lista de líneas:
Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
O recombine archivos completos de nuevo a cadenas individuales (en este ejemplo, el resultado es el mismo que el que obtiene de wholeTextFiles, pero con la cadena "file:" eliminada de la ruta de archivo):
Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()
fuente
Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)
recibí el error, es decirTypeError: 'PipelinedRDD' object is not iterable
. Tengo entendido que esa línea crea un RDD que es inmutable, por lo que me preguntaba cómo fue capaz de agregarlo a otra variable.puedes usar
aquí obtendrá la ruta de su archivo y el contenido de ese archivo. para que pueda realizar cualquier acción de un archivo completo a la vez que ahorre la sobrecarga
fuente
Todas las respuestas son correctas con
sc.textFile
Me preguntaba por qué no.
wholeTextFiles
Por ejemplo, en este caso ...Una limitación es que tenemos que cargar archivos pequeños, de lo contrario el rendimiento será malo y puede conducir a OOM.
Nota :
Más referencia para visitar
fuente
sc.wholeTextFiles(folder).flatMap...
Hay una solución limpia directa disponible. Use el método wholeTextFiles (). Esto tomará un directorio y formará un par de valores clave. El RDD devuelto será un par RDD. Encuentra debajo la descripción de Spark docs :
fuente
PRUEBA ESTO interfaz utilizada para escribir un DataFrame en sistemas de almacenamiento externo (por ejemplo, sistemas de archivos, almacenes de valores clave, etc.). Use DataFrame.write () para acceder a esto.
Nuevo en la versión 1.4.
csv (ruta, modo = Ninguno, compresión = Ninguno, sep = Ninguno, comilla = Ninguno, escape = Ninguno, encabezado = Ninguno, nullValue = Ninguno, escapeQuotes = Ninguno, quoteAll = Ninguno, dateFormat = Ninguno, timestampFormat = Ninguno) Guarda el contenido del DataFrame en formato CSV en la ruta especificada.
Parámetros: ruta - la ruta en cualquier modo de sistema de archivos compatible con Hadoop - especifica el comportamiento de la operación de guardar cuando los datos ya existen.
append: añade contenido de este DataFrame a los datos existentes. sobrescribir: sobrescribe los datos existentes. ignorar: ignore silenciosamente esta operación si ya existen datos. error (caso predeterminado): arroje una excepción si los datos ya existen. compresión - códec de compresión para usar al guardar en un archivo. Este puede ser uno de los nombres abreviados que no distinguen entre mayúsculas y minúsculas (none, bzip2, gzip, lz4, snappy and deflate). sep: establece el carácter único como separador para cada campo y valor. Si Ninguno está configurado, usa el valor predeterminado,,. quote: establece el carácter único utilizado para escapar de los valores entre comillas donde el separador puede ser parte del valor. Si Ninguno está configurado, utiliza el valor predeterminado, ". Si desea desactivar las comillas, debe establecer una cadena vacía. Escape: establece el carácter único utilizado para escapar de las comillas dentro de un valor ya citado. Si Ninguno está configurado , utiliza el valor predeterminado, \ escapeQuotes: una marca que indica si los valores que contienen comillas siempre deben estar entre comillas. Si Ninguno está configurado, utiliza el valor predeterminado verdadero, escapando a todos los valores que contienen un carácter de comillas. quoteAll: una marca que indica si todos los valores siempre deben estar entre comillas. Si se establece Ninguno, utiliza el valor predeterminado falso, solo escapa los valores que contienen un carácter de comillas. encabezado: escribe los nombres de las columnas como la primera línea. Si Ninguno está configurado, utiliza el valor predeterminado, falso. nullValue: establece la representación de cadena de un valor nulo. Si Ninguno está configurado, utiliza el valor predeterminado, cadena vacía. dateFormat: establece la cadena que indica un formato de fecha. Los formatos de fecha personalizados siguen los formatos en java.text.SimpleDateFormat. Esto se aplica al tipo de fecha. Si Ninguno está configurado, utiliza el valor del valor predeterminado, aaaa-MM-dd. timestampFormat: establece la cadena que indica un formato de marca de tiempo. Los formatos de fecha personalizados siguen los formatos en java.text.SimpleDateFormat. Esto se aplica al tipo de marca de tiempo. Si se establece Ninguno, utiliza el valor del valor predeterminado, aaaa-MM-dd'T'HH: mm: ss.SSSZZ.
fuente
fuente