En muchas situaciones de la vida real en las que aplica MapReduce, los algoritmos finales terminan siendo varios pasos de MapReduce.
es decir, Mapa1, Reducir1, Mapa2, Reducir2, etc.
Entonces tiene la salida de la última reducción que se necesita como entrada para el siguiente mapa.
Los datos intermedios son algo que (en general) no desea conservar una vez que la canalización se ha completado con éxito. Además, debido a que estos datos intermedios son, en general, una estructura de datos (como un 'mapa' o un 'conjunto'), no desea poner demasiado esfuerzo en escribir y leer estos pares clave-valor.
¿Cuál es la forma recomendada de hacerlo en Hadoop?
¿Hay un ejemplo (simple) que muestre cómo manejar estos datos intermedios de la manera correcta, incluida la limpieza posterior?
Respuestas:
Creo que este tutorial en la red de desarrolladores de Yahoo te ayudará con esto: encadenando trabajos
Usas el
JobClient.runJob()
. La ruta de salida de los datos del primer trabajo se convierte en la ruta de entrada a su segundo trabajo. Estos deben pasarse como argumentos a sus trabajos con el código apropiado para analizarlos y configurar los parámetros para el trabajo.Sin embargo, creo que el método anterior podría ser la forma en que lo hizo la API mapeada más antigua, pero aún así debería funcionar. Habrá un método similar en la nueva API mapreduce, pero no estoy seguro de qué es.
En cuanto a eliminar datos intermedios después de que un trabajo haya terminado, puede hacerlo en su código. La forma en que lo hice antes es usar algo como:
Donde la ruta es la ubicación en HDFS de los datos. Debe asegurarse de eliminar solo estos datos una vez que ningún otro trabajo lo requiera.
fuente
Hay muchas formas de hacerlo.
(1) Trabajos en cascada
Cree el objeto JobConf "job1" para el primer trabajo y configure todos los parámetros con "input" como inputdirectory y "temp" como directorio de salida. Ejecute este trabajo:
Inmediatamente debajo de él, cree el objeto JobConf "job2" para el segundo trabajo y configure todos los parámetros con "temp" como directorio de entrada y "output" como directorio de salida. Ejecute este trabajo:
(2) Cree dos objetos JobConf y establezca todos los parámetros en ellos como (1), excepto que no use JobClient.run.
Luego cree dos objetos Job con jobconfs como parámetros:
Usando el objeto jobControl, usted especifica las dependencias del trabajo y luego ejecuta los trabajos:
(3) Si necesita una estructura similar a Map + | Reducir | Map *, puede usar las clases ChainMapper y ChainReducer que vienen con Hadoop versión 0.19 y posteriores.
fuente
En realidad, hay varias maneras de hacer esto. Me enfocaré en dos.
Una es a través de Riffle ( http://github.com/cwensel/riffle ) una biblioteca de anotaciones para identificar cosas dependientes y 'ejecutarlas' en orden de dependencia (topológico).
O puede usar una cascada (y MapReduceFlow) en cascada ( http://www.cascading.org/ ). Una versión futura admitirá anotaciones de Riffle, pero ahora funciona muy bien con trabajos de MR JobConf sin procesar.
Una variante de esto es no administrar los trabajos de MR a mano, sino desarrollar su aplicación utilizando la API en cascada. Luego, JobConf y el encadenamiento de trabajos se manejan internamente a través del planificador en cascada y las clases de flujo.
De esta manera, pasa su tiempo enfocándose en su problema, no en la mecánica de administrar los trabajos de Hadoop, etc. Incluso puede superponer diferentes idiomas (como clojure o jruby) para simplificar aún más su desarrollo y aplicaciones. http://www.cascading.org/modules.html
fuente
He realizado el encadenamiento de trabajos utilizando objetos JobConf uno tras otro. Tomé el ejemplo de WordCount para encadenar los trabajos. Un trabajo calcula cuántas veces se repite una palabra en la salida dada. El segundo trabajo toma la salida del primer trabajo como entrada y calcula el total de palabras en la entrada dada. A continuación se muestra el código que debe colocarse en la clase Driver.
El comando para ejecutar estos trabajos es:
bin / hadoop jar TotalWords.
Necesitamos dar el nombre de los trabajos finales para el comando. En el caso anterior, es TotalWords.
fuente
Puede ejecutar la cadena MR de la manera indicada en el código.
TENGA EN CUENTA : solo se ha proporcionado el código del controlador
LA SECUENCIA ES
( TRABAJO1 ) MAPA- > REDUCIR-> ( TRABAJO2 ) MAPA
Esto se hizo para ordenar las claves, pero hay más formas de usar un mapa de árbol.
Sin embargo, ¡quiero centrar su atención en la forma en que se encadenaron los Trabajos! !
Gracias
fuente
Puede usar oozie para procesar en barch sus trabajos de MapReduce. http://issues.apache.org/jira/browse/HADOOP-5303
fuente
Hay ejemplos en el proyecto Apache Mahout que encadenan múltiples trabajos de MapReduce. Uno de los ejemplos se puede encontrar en:
RecomendadorJob.java
http://search-lucene.com/c/Mahout:/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java%7C%7CRecommenderJob
fuente
Podemos hacer uso del
waitForCompletion(true)
método del Trabajo para definir la dependencia entre el trabajo.En mi caso, tenía 3 trabajos que dependían unos de otros. En la clase de controlador utilicé el siguiente código y funciona como se esperaba.
fuente
La nueva clase org.apache.hadoop.mapreduce.lib.chain.ChainMapper ayuda a este escenario
fuente
Aunque existen complejos motores de flujo de trabajo de Hadoop basados en servidores, por ejemplo, oozie, tengo una biblioteca simple de Java que permite la ejecución de múltiples trabajos de Hadoop como flujo de trabajo. La configuración del trabajo y el flujo de trabajo que define la dependencia entre trabajos se configura en un archivo JSON. Todo es configurable externamente y no requiere ningún cambio en el mapa existente, lo que reduce la implementación para formar parte de un flujo de trabajo.
Detalles pueden ser encontrados aqui. El código fuente y el jar están disponibles en github.
http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/
Pranab
fuente
Creo que oozie ayuda a los trabajos consiguientes a recibir las entradas directamente del trabajo anterior. Esto evita la operación de E / S realizada con control de trabajo.
fuente
Si desea encadenar sus trabajos mediante programación, deberá usar JobControl. El uso es bastante simple:
Después de eso, agrega instancias de ControlledJob. ControlledJob define un trabajo con sus dependencias, conectando automáticamente entradas y salidas para adaptarse a una "cadena" de trabajos.
comienza la cadena Querrás poner eso en un hilo de speerate. Esto permite verificar el estado de su cadena mientras se ejecuta:
fuente
Como ha mencionado en su requisito de que desea que o / p de MRJob1 sea el i / p de MRJob2, etc., puede considerar usar el flujo de trabajo de oozie para este caso de uso. También puede considerar escribir sus datos intermedios en HDFS, ya que serán utilizados por el próximo MRJob. Y una vez que se complete el trabajo, puede limpiar sus datos intermedios.
fuente