¿Cómo procesa Hadoop los registros divididos entre los límites de los bloques?

119

De acuerdo con la Hadoop - The Definitive Guide

Los registros lógicos que define FileInputFormats no suelen encajar perfectamente en los bloques HDFS. Por ejemplo, los registros lógicos de TextInputFormat son líneas, que cruzarán los límites de HDFS la mayoría de las veces. Esto no influye en el funcionamiento de su programa (las líneas no se pierden ni se rompen, por ejemplo) pero vale la pena conocerlo, ya que significa que los mapas locales de datos (es decir, mapas que se ejecutan en el mismo host que su datos de entrada) realizará algunas lecturas remotas. La ligera sobrecarga que esto causa normalmente no es significativa.

Suponga que una línea de registro se divide en dos bloques (b1 y b2). El asignador que procesa el primer bloque (b1) notará que la última línea no tiene un separador de EOL y recuperará el resto de la línea del siguiente bloque de datos (b2).

¿Cómo determina el mapeador que procesa el segundo bloque (b2) que el primer registro está incompleto y debe procesar a partir del segundo registro del bloque (b2)?

Praveen Sripati
fuente

Respuestas:

160

Pregunta interesante, pasé un tiempo mirando el código para conocer los detalles y aquí están mis pensamientos. Las divisiones son manejadas por el cliente InputFormat.getSplits, por lo que un vistazo a FileInputFormat da la siguiente información:

  • Para cada archivo de entrada, conseguir la longitud del archivo, el tamaño del bloque y calcular el tamaño de división como max(minSize, min(maxSize, blockSize))cuando maxSizese corresponde con mapred.max.split.sizey minSizees mapred.min.split.size.
  • Divida el archivo en diferentes FileSplits según el tamaño de división calculado anteriormente. Lo importante aquí es que cada uno FileSplitse inicializa con un startparámetro correspondiente al desplazamiento en el archivo de entrada . Todavía no hay manejo de las líneas en ese punto. La parte relevante del código se ve así:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

Después de eso, si observa LineRecordReadercuál está definido por TextInputFormat, ahí es donde se manejan las líneas:

  • Cuando inicializa su LineRecordReader, intenta crear una instancia LineReaderque es una abstracción para poder leer las líneas FSDataInputStream. Hay 2 casos:
  • Si hay un CompressionCodecdefinido, entonces este códec es responsable de manejar los límites. Probablemente no sea relevante para tu pregunta.
  • Sin embargo, si no hay un códec, ahí es donde las cosas son interesantes: si el startde su InputSplites diferente de 0, entonces retrocede 1 carácter y luego se salta la primera línea que encuentra identificada por \ n o \ r \ n (Windows) . El retroceso es importante porque en caso de que los límites de su línea sean los mismos que los límites divididos, esto garantiza que no se salte la línea válida. Aquí está el código relevante:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

Entonces, dado que las divisiones se calculan en el cliente, los mapeadores no necesitan ejecutarse en secuencia, cada mapeador ya sabe si necesita descartar la primera línea o no.

Básicamente, si tiene 2 líneas de cada 100 Mb en el mismo archivo, y para simplificar, digamos que el tamaño de división es 64 Mb. Luego, cuando se calculen las divisiones de entrada, tendremos el siguiente escenario:

  • Split 1 que contiene la ruta y los hosts a este bloque. Inicializado al inicio 200-200 = 0Mb, longitud 64Mb.
  • Split 2 inicializado al inicio 200-200 + 64 = 64Mb, longitud 64Mb.
  • Split 3 inicializado al inicio 200-200 + 128 = 128Mb, longitud 64Mb.
  • Split 4 inicializado al inicio 200-200 + 192 = 192Mb, longitud 8Mb.
  • El asignador A procesará la división 1, el inicio es 0, así que no se salte la primera línea y lea una línea completa que supere el límite de 64 Mb, por lo que necesita lectura remota.
  • El asignador B procesará la división 2, ¡el inicio es! = 0, así que omita la primera línea después de 64Mb-1byte, que corresponde al final de la línea 1 a 100Mb que todavía está en la división 2, tenemos 28Mb de la línea en la división 2, por lo que lectura remota de los 72Mb restantes.
  • El asignador C procesará la división 3, ¡el inicio es! = 0, así que omita la primera línea después de 128Mb-1byte, que corresponde al final de la línea 2 a 200Mb, que es el final del archivo, así que no haga nada.
  • El mapeador D es el mismo que el mapeador C, excepto que busca una nueva línea después de 192 Mb-1byte.
Charles Menguy
fuente
También @PraveenSripati, vale la pena mencionar que los casos extremos en los que un límite estaría en \ r en una devolución \ r \ n se manejan en la LineReader.readLinefunción, no creo que sea relevante para su pregunta, pero puedo agregar más detalles si es necesario.
Charles Menguy
Supongamos que hay dos líneas con 64 MB exactos en la entrada y, por lo tanto, los InputSplits ocurren exactamente en los límites de la línea. Entonces, ¿el mapeador siempre ignorará la línea en el segundo bloque porque start! = 0.
Praveen Sripati
6
@PraveenSripati En ese caso, el segundo mapeador verá start! = 0, así que retroceda 1 carácter, que lo lleva de regreso justo antes del \ n de la primera línea y luego salte al siguiente \ n. Por lo tanto, omitirá la primera línea pero procesará la segunda línea como se esperaba.
Charles Menguy
@CharlesMenguy ¿es posible que la primera línea del archivo se salte de alguna manera? Concretamente, tengo la primera línea con key = 1, y valor a, luego hay dos líneas más con la misma clave en algún lugar del archivo, key = 1, val = b y key = 1, val = c. El caso es que mi reductor obtiene {1, [b, c]} y {1, [a]}, en lugar de {1, [a, b, c]}. Esto no sucede si agrego una nueva línea al principio de mi archivo. ¿Cuál podría ser la razón, señor?
Kobe-Wan Kenobi
@CharlesMenguy ¿Qué pasa si el archivo en HDFS es un archivo binario (a diferencia de un archivo de texto, en el que \r\n, \nrepresenta el truncamiento de registros)?
CᴴᴀZ
17

El algoritmo Map Reduce no funciona en bloques físicos del archivo. Funciona en divisiones de entrada lógica. La división de entrada depende de dónde se escribió el registro. Un registro puede abarcar dos mapeadores.

La forma en que se ha configurado HDFS divide los archivos muy grandes en bloques grandes (por ejemplo, que miden 128 MB) y almacena tres copias de estos bloques en diferentes nodos del clúster.

HDFS no tiene conocimiento del contenido de estos archivos. Es posible que se haya iniciado un registro en el Bloque-a, pero el final de ese registro puede estar presente en el Bloque-b .

Para resolver este problema, Hadoop utiliza una representación lógica de los datos almacenados en bloques de archivos, conocidos como divisiones de entrada. Cuando un cliente de trabajo MapReduce calcula las divisiones de entrada , determina dónde comienza el primer registro completo de un bloque y dónde termina el último registro del bloque .

El punto clave :

En los casos en los que el último registro de un bloque está incompleto, la división de entrada incluye información de ubicación para el siguiente bloque y el desplazamiento de bytes de los datos necesarios para completar el registro.

Eche un vistazo al siguiente diagrama.

ingrese la descripción de la imagen aquí

Eche un vistazo a este artículo y a la pregunta de SE relacionada: Acerca de la división de archivos Hadoop / HDFS

Se pueden leer más detalles en la documentación

El marco Map-Reduce se basa en el InputFormat del trabajo para:

  1. Valide la especificación de entrada del trabajo.
  2. Divida el (los) archivo (s) de entrada en InputSplits lógicos, cada uno de los cuales se asigna a un asignador individual.
  3. Luego, cada InputSplit se asigna a un asignador individual para su procesamiento. Split podría ser una tupla . InputSplit[] getSplits(JobConf job,int numSplits) es la API que se encarga de estas cosas.

FileInputFormat , que extiende el método InputFormatimplementado getSplits(). Eche un vistazo a los aspectos internos de este método en grepcode

Ravindra babu
fuente
7

Lo veo de la siguiente manera: InputFormat es responsable de dividir los datos en divisiones lógicas teniendo en cuenta la naturaleza de los datos.
Nada le impide hacerlo, aunque puede agregar una latencia significativa al trabajo: toda la lógica y la lectura alrededor de los límites de tamaño de división deseados se realizarán en el rastreador de trabajos.
El formato de entrada de registro más simple es TextInputFormat. Funciona de la siguiente manera (por lo que entendí del código): el formato de entrada crea divisiones por tamaño, independientemente de las líneas, pero LineRecordReader siempre:
a) Omite la primera línea en la división (o parte de ella), si no lo es la primera división
b) Lea una línea después del límite de la división al final (si hay datos disponibles, por lo que no es la última división).

David Gruzman
fuente
Skip first line in the split (or part of it), if it is not the first split- Si el primer registro en el que no es el primer bloque está completo, entonces no estoy seguro de cómo funcionará esta lógica.
Praveen Sripati
Por lo que veo el código, cada división lee lo que tiene + la siguiente línea. Entonces, si el salto de línea no está en el límite del bloque, está bien. ¿Cómo se maneja exactamente el caso cuando el salto de línea está exactamente en el límite del bloque? Hay que entenderlo. Leeré el código un poco más
David Gruzman
3

Por lo que he entendido, cuando FileSplitse inicializa para el primer bloque, se llama al constructor predeterminado. Por lo tanto, los valores de inicio y longitud son cero inicialmente. Al final del procesamiento del primer bloque, si la última línea está incompleta, entonces el valor de la longitud será mayor que la longitud de la división y también leerá la primera línea del siguiente bloque. Debido a esto, el valor de inicio para el primer bloque será mayor que cero y bajo esta condición, LineRecordReaderse saltará la primera línea del segundo bloque. (Ver fuente )

En caso de que la última línea del primer bloque esté completa, el valor de la longitud será igual a la longitud del primer bloque y el valor del inicio del segundo bloque será cero. En ese caso LineRecordReader, no saltará la primera línea y leerá el segundo bloque desde el principio.

¿Tiene sentido?

aa8y
fuente
2
En este escenario, los mapeadores deben comunicarse entre sí y procesar los bloques en secuencia cuando la última línea de un bloque en particular no está completa. No estoy seguro si esta es la forma en que funciona.
Praveen Sripati
1

Desde el código fuente de hadoop de LineRecordReader.java el constructor: encuentro algunos comentarios:

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
  start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

de esto, creo que hadoop leerá una línea adicional para cada división (al final de la división actual, leerá la siguiente línea en la siguiente división), y si no es la primera división, la primera línea se descartará. para que ningún registro de línea se pierda o esté incompleto

Shenghai.Geng
fuente
0

Los cartógrafos no tienen que comunicarse. Los bloques de archivos están en HDFS y el mapeador actual (RecordReader) puede leer el bloque que tiene la parte restante de la línea. Esto sucede entre bastidores.

usuario3507308
fuente