Pregunta interesante, pasé un tiempo mirando el código para conocer los detalles y aquí están mis pensamientos. Las divisiones son manejadas por el cliente InputFormat.getSplits
, por lo que un vistazo a FileInputFormat da la siguiente información:
- Para cada archivo de entrada, conseguir la longitud del archivo, el tamaño del bloque y calcular el tamaño de división como
max(minSize, min(maxSize, blockSize))
cuando maxSize
se corresponde con mapred.max.split.size
y minSize
es mapred.min.split.size
.
Divida el archivo en diferentes FileSplit
s según el tamaño de división calculado anteriormente. Lo importante aquí es que cada uno FileSplit
se inicializa con un start
parámetro correspondiente al desplazamiento en el archivo de entrada . Todavía no hay manejo de las líneas en ese punto. La parte relevante del código se ve así:
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
Después de eso, si observa LineRecordReader
cuál está definido por TextInputFormat
, ahí es donde se manejan las líneas:
- Cuando inicializa su
LineRecordReader
, intenta crear una instancia LineReader
que es una abstracción para poder leer las líneas FSDataInputStream
. Hay 2 casos:
- Si hay un
CompressionCodec
definido, entonces este códec es responsable de manejar los límites. Probablemente no sea relevante para tu pregunta.
Sin embargo, si no hay un códec, ahí es donde las cosas son interesantes: si el start
de su InputSplit
es diferente de 0, entonces retrocede 1 carácter y luego se salta la primera línea que encuentra identificada por \ n o \ r \ n (Windows) . El retroceso es importante porque en caso de que los límites de su línea sean los mismos que los límites divididos, esto garantiza que no se salte la línea válida. Aquí está el código relevante:
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
Entonces, dado que las divisiones se calculan en el cliente, los mapeadores no necesitan ejecutarse en secuencia, cada mapeador ya sabe si necesita descartar la primera línea o no.
Básicamente, si tiene 2 líneas de cada 100 Mb en el mismo archivo, y para simplificar, digamos que el tamaño de división es 64 Mb. Luego, cuando se calculen las divisiones de entrada, tendremos el siguiente escenario:
- Split 1 que contiene la ruta y los hosts a este bloque. Inicializado al inicio 200-200 = 0Mb, longitud 64Mb.
- Split 2 inicializado al inicio 200-200 + 64 = 64Mb, longitud 64Mb.
- Split 3 inicializado al inicio 200-200 + 128 = 128Mb, longitud 64Mb.
- Split 4 inicializado al inicio 200-200 + 192 = 192Mb, longitud 8Mb.
- El asignador A procesará la división 1, el inicio es 0, así que no se salte la primera línea y lea una línea completa que supere el límite de 64 Mb, por lo que necesita lectura remota.
- El asignador B procesará la división 2, ¡el inicio es! = 0, así que omita la primera línea después de 64Mb-1byte, que corresponde al final de la línea 1 a 100Mb que todavía está en la división 2, tenemos 28Mb de la línea en la división 2, por lo que lectura remota de los 72Mb restantes.
- El asignador C procesará la división 3, ¡el inicio es! = 0, así que omita la primera línea después de 128Mb-1byte, que corresponde al final de la línea 2 a 200Mb, que es el final del archivo, así que no haga nada.
- El mapeador D es el mismo que el mapeador C, excepto que busca una nueva línea después de 192 Mb-1byte.
LineReader.readLine
función, no creo que sea relevante para su pregunta, pero puedo agregar más detalles si es necesario.\r\n, \n
representa el truncamiento de registros)?El algoritmo Map Reduce no funciona en bloques físicos del archivo. Funciona en divisiones de entrada lógica. La división de entrada depende de dónde se escribió el registro. Un registro puede abarcar dos mapeadores.
La forma en que se ha configurado HDFS divide los archivos muy grandes en bloques grandes (por ejemplo, que miden 128 MB) y almacena tres copias de estos bloques en diferentes nodos del clúster.
HDFS no tiene conocimiento del contenido de estos archivos. Es posible que se haya iniciado un registro en el Bloque-a, pero el final de ese registro puede estar presente en el Bloque-b .
Para resolver este problema, Hadoop utiliza una representación lógica de los datos almacenados en bloques de archivos, conocidos como divisiones de entrada. Cuando un cliente de trabajo MapReduce calcula las divisiones de entrada , determina dónde comienza el primer registro completo de un bloque y dónde termina el último registro del bloque .
El punto clave :
En los casos en los que el último registro de un bloque está incompleto, la división de entrada incluye información de ubicación para el siguiente bloque y el desplazamiento de bytes de los datos necesarios para completar el registro.
Eche un vistazo al siguiente diagrama.
Eche un vistazo a este artículo y a la pregunta de SE relacionada: Acerca de la división de archivos Hadoop / HDFS
Se pueden leer más detalles en la documentación
El marco Map-Reduce se basa en el InputFormat del trabajo para:
InputSplit[] getSplits(JobConf job,int numSplits
) es la API que se encarga de estas cosas.FileInputFormat , que extiende el método
InputFormat
implementadogetSplits
(). Eche un vistazo a los aspectos internos de este método en grepcodefuente
Lo veo de la siguiente manera: InputFormat es responsable de dividir los datos en divisiones lógicas teniendo en cuenta la naturaleza de los datos.
Nada le impide hacerlo, aunque puede agregar una latencia significativa al trabajo: toda la lógica y la lectura alrededor de los límites de tamaño de división deseados se realizarán en el rastreador de trabajos.
El formato de entrada de registro más simple es TextInputFormat. Funciona de la siguiente manera (por lo que entendí del código): el formato de entrada crea divisiones por tamaño, independientemente de las líneas, pero LineRecordReader siempre:
a) Omite la primera línea en la división (o parte de ella), si no lo es la primera división
b) Lea una línea después del límite de la división al final (si hay datos disponibles, por lo que no es la última división).
fuente
Skip first line in the split (or part of it), if it is not the first split
- Si el primer registro en el que no es el primer bloque está completo, entonces no estoy seguro de cómo funcionará esta lógica.Por lo que he entendido, cuando
FileSplit
se inicializa para el primer bloque, se llama al constructor predeterminado. Por lo tanto, los valores de inicio y longitud son cero inicialmente. Al final del procesamiento del primer bloque, si la última línea está incompleta, entonces el valor de la longitud será mayor que la longitud de la división y también leerá la primera línea del siguiente bloque. Debido a esto, el valor de inicio para el primer bloque será mayor que cero y bajo esta condición,LineRecordReader
se saltará la primera línea del segundo bloque. (Ver fuente )En caso de que la última línea del primer bloque esté completa, el valor de la longitud será igual a la longitud del primer bloque y el valor del inicio del segundo bloque será cero. En ese caso
LineRecordReader
, no saltará la primera línea y leerá el segundo bloque desde el principio.¿Tiene sentido?
fuente
Desde el código fuente de hadoop de LineRecordReader.java el constructor: encuentro algunos comentarios:
de esto, creo que hadoop leerá una línea adicional para cada división (al final de la división actual, leerá la siguiente línea en la siguiente división), y si no es la primera división, la primera línea se descartará. para que ningún registro de línea se pierda o esté incompleto
fuente
Los cartógrafos no tienen que comunicarse. Los bloques de archivos están en HDFS y el mapeador actual (RecordReader) puede leer el bloque que tiene la parte restante de la línea. Esto sucede entre bastidores.
fuente