¿Por qué fallan los trabajos de Spark con org.apache.spark.shuffle.MetadataFetchFailedException: falta una ubicación de salida para shuffle 0 en modo de especulación?

85

Estoy ejecutando un trabajo de Spark en modo de especulación. Tengo alrededor de 500 tareas y alrededor de 500 archivos de 1 GB gz comprimidos. Sigo recibiendo en cada trabajo, para 1-2 tareas, el error adjunto donde se repite luego decenas de veces (evitando que el trabajo se complete).

org.apache.spark.shuffle.MetadataFetchFailedException: Falta una ubicación de salida para shuffle 0

¿Alguna idea de cuál es el significado del problema y cómo solucionarlo?

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
    at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
    at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
    at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)
dotan
fuente
1
¿Ha visto algún LostExecutormensaje INFO? ¿Puede consultar la página de Ejecutores de la interfaz de usuario web y ver cómo se comportan los ejecutores, esp. ¿GC-sabio?
Jacek Laskowski

Respuestas:

50

Esto me sucedió cuando le di más memoria al nodo trabajador de la que tiene. Como no tenía intercambio, la chispa se bloqueó mientras intentaba almacenar objetos para mezclarlos sin más memoria.

La solución fue agregar swap o configurar el trabajador / ejecutor para usar menos memoria además de usar el nivel de almacenamiento MEMORY_AND_DISK para varias persistencias.

Joren Van Severen
fuente
3
Si tiene un recurso en el nodo (memoria), puede intentar aumentar la memoria del ejecutor de Spark. Lo intentaré primero si también te preocupa el rendimiento.
nir
14
Hola @Joren, esto no es una competencia. El problema de OP es que el ejecutor no tiene suficiente memoria para almacenar la salida aleatoria. Lo que funcionó para usted no es disminuir la memoria del ejecutor, sino usar el nivel de almacenamiento MEMORY_AND_DISK que elimina la limitación de memoria del ejecutor. Además, OP no dice cuántos recursos tiene para albacea.
nir
Tengo el mismo problema y he probado métodos como aumentar la memoria del ejecutor, aumentar la cantidad de particiones y liberar más memoria física. Y a veces funcionó, mientras que otras no. Descubrí que esto solo sucedía en la fase de lectura aleatoria y me gustaría preguntar dónde puedo configurar StorageLevel.
Lhfcws
Optimicé mi estructura de datos y la arreglé. Acabo de cambiar HashMap a un byte [] que fue serializado por protostuff
Lhfcws
1
Intente cambiar spark.driver.overhead.memory y spark.executor.overhead.memory a un valor superior a 384 (predeterminado) y debería funcionar. Puede utilizar 1024 MB o 2048 MB.
rahul gulati
14

Tuvimos un error similar con Spark, pero no estoy seguro de que esté relacionado con su problema.

Usamos JavaPairRDD.repartitionAndSortWithinPartitionsdatos de 100 GB y seguía fallando de manera similar a su aplicación. Luego miramos los registros de Yarn en los nodos específicos y descubrimos que tenemos algún tipo de problema de falta de memoria, por lo que Yarn interrumpió la ejecución. Nuestra solución fue cambiar / añadir spark.shuffle.memoryFraction 0en .../spark/conf/spark-defaults.conf. Eso nos permitió manejar una cantidad mucho mayor (pero desafortunadamente no infinita) de datos de esta manera.

No en lista
fuente
¿Es realmente "0" o fue un error de escritura? ¿Cuál es la lógica detrás de eso, para obligarlo a derramarse permanentemente en el disco?
Virgil
@Virgil Sí. Hicimos algunas pruebas. Cuanto más nos acercábamos a cero, mayor era la cantidad procesable. El precio fue el 20% del tiempo.
Notinlist
Interesante, también reduzco spark.shuffle.memoryFraction a cero pero obtuve más errores seguidos. (A saber: MetadataFetchFailedException y FetchFailedException de forma intermitente) Debería convertirse en un error / problema si "all-spill" tiene menos errores que "parcialmente-spill".
tribbloid
11

Tengo el mismo problema en mi clúster YARN de 3 máquinas. Seguí cambiando la RAM pero el problema persistió. Finalmente vi los siguientes mensajes en los registros:

17/02/20 13:11:02 WARN spark.HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 1006275 ms exceeds timeout 1000000 ms
17/02/20 13:11:02 ERROR cluster.YarnScheduler: Lost executor 2 on 1worker.com: Executor heartbeat timed out after 1006275 ms

y después de esto, apareció este mensaje:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 67

Modifiqué las propiedades en spark-defaults.conf de la siguiente manera:

spark.yarn.scheduler.heartbeat.interval-ms 7200000
spark.executor.heartbeatInterval 7200000
spark.network.timeout 7200000

¡Eso es! Mi trabajo se completó con éxito después de esto.

xplorerdev
fuente
En los documentos de chispa, se dice: spark.executor.heartbeatInterval should be significantly less than spark.network.timeout. Por lo tanto, establecer ambos en el mismo valor podría no ser la mejor idea.
Bitswazsky
2

Para mí, estaba haciendo algunas ventanas en datos grandes (alrededor de 50B filas) y obteniendo una carga de bote de

ExternalAppendOnlyUnsafeRowArray:54 - Se alcanzó el umbral de derrame de 4096 filas, cambiando a org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

En mis registros. Obviamente, 4096 puede ser pequeño en tal tamaño de datos ... esto me llevó a la siguiente JIRA:

https://issues.apache.org/jira/browse/SPARK-21595

Y finalmente a las siguientes dos opciones de configuración:

  • spark.sql.windowExec.buffer.spill.threshold
  • spark.sql.windowExec.buffer.in.memory.threshold

Ambos valores predeterminados son 4096; Los elevé mucho más alto (2097152) y las cosas ahora parecen ir bien. No estoy 100% seguro de que esto sea lo mismo que el problema planteado aquí, pero es otra cosa que intentar.

MichaelChirico
fuente
1

Resolví este error aumentando la memoria asignada en ExecutiveMemory y driverMemory. Puede hacer esto en HUE seleccionando el Programa Spark que está causando el problema y en propiedades -> Lista de opciones puede agregar algo como esto:

--driver-memory 10G --executor-memory 10G --num-executors 50 --executor-cores 2

Por supuesto, los valores de los parámetros variarán según el tamaño de su clúster y sus necesidades.

Ignacio Alorre
fuente
1

en la interfaz de usuario web de Spark, si hay alguna información como Executors lost, entonces debe verificar el registro de hilo, asegurarse de que su contenedor haya sido eliminado.

Si el contenedor se eliminó, probablemente se deba a la falta de memoria.

¿Cómo encontrar la información clave en los registros de hilo? Por ejemplo, puede haber algunas advertencias como esta:

Container killed by YARN for exceeding memory limits. 2.5 GB of 2.5 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.

En este caso, sugiere que debería aumentar spark.yarn.executor.memoryOverhead.

DennisLi
fuente
0

En mi caso (clúster independiente), se lanzó la excepción porque el sistema de archivos de algunos esclavos Spark se llenó al 100%. Eliminar todo en las spark/workcarpetas de los esclavos resolvió el problema.

i000174
fuente
0

Tengo el mismo problema, pero busqué muchas respuestas que no pueden resolver mi problema. eventualmente, depuro mi código paso a paso. Encuentro que el problema causado por el tamaño de los datos no está equilibrado para cada partición, lo que conduce a MetadataFetchFailedExceptioneso en mapetapa, no en reduceetapa. solo hazlo df_rdd.repartition(nums)antesreduceByKey()

Perro
fuente