Spark java.lang.OutOfMemoryError: espacio de almacenamiento dinámico de Java

228

Mi clúster: 1 maestro, 11 esclavos, cada nodo tiene 6 GB de memoria.

Mi configuración:

spark.executor.memory=4g, Dspark.akka.frameSize=512

Aquí está el problema:

Primero , leí algunos datos (2,19 GB) de HDFS a RDD:

val imageBundleRDD = sc.newAPIHadoopFile(...)

En segundo lugar , haga algo en este RDD:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })

Por último , salida a HDFS:

res.saveAsNewAPIHadoopFile(...)

Cuando ejecuto mi programa muestra:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space

Hay demasiadas tareas?

PD : Todo está bien cuando los datos de entrada son de aproximadamente 225 MB.

¿Como puedó resolver esté problema?

hequn8128
fuente
¿Cómo correr chispa? ¿Es de la consola? o qué scripts de despliegue utilizas?
Tombart
Uso sbt para compilar y ejecutar mi aplicación. sbt package luego sbt run. Implementé el mismo programa en hadoop hace un mes, y encontré el mismo problema de OutOfMemoryError, pero en hadoop se puede resolver fácilmente aumentando el valor de mapred.child.java.opts de Xmx200m a Xmx400m. ¿Tiene spark alguna configuración jvm para sus tareas? Me pregunto si spark.executor.memory tiene el mismo significado que mapred.child.java.opts en hadoop. En mi programa, spark.executor.memory ya se ha configurado en 4g mucho más grande que Xmx400m en hadoop. Gracias ~
hequn8128
¿Son los tres pasos que mencionas los únicos que haces? ¿Cuál es el tamaño de los datos generados por (data._1, desPoints)? Esto debería caber en la memoria especialmente si estos datos se envían a otra etapa
Arnon Rotem-Gal-Oz
1
¿Cuál es la configuración de memoria para el controlador? Compruebe qué servidor obtiene el error de falta de memoria. ¿Es el conductor o uno de los ejecutores?
RanP
Vea aquí todas las propiedades de configuración: spark.apache.org/docs/2.1.0/configuration.html
Naramsim

Respuestas:

364

Tengo algunas sugerencias:

  • Si los nodos están configurados para tener el máximo de 6 g de chispa (y están dejando un poco para otros procesos), a continuación, utilizar en lugar de 6 g 4g, spark.executor.memory=6g. Asegúrese de estar usando la mayor cantidad de memoria posible verificando la interfaz de usuario (le dirá la cantidad de memoria que está usando)
  • Intente usar más particiones, debería tener 2 - 4 por CPU. IME aumentar el número de particiones suele ser la forma más fácil de hacer que un programa sea más estable (y a menudo más rápido). Para grandes cantidades de datos puede que necesite más de 4 por CPU, ¡he tenido que usar 8000 particiones en algunos casos!
  • Disminuya la fracción de memoria reservada para el almacenamiento en caché , utilizando spark.storage.memoryFraction. Si no usa cache()o persisten su código, este también podría ser 0. Su valor predeterminado es 0.6, lo que significa que solo obtiene 0.4 * 4g de memoria para su montón. El IME que reduce la fracción de membranas a menudo hace que las OOM desaparezcan. ACTUALIZACIÓN: desde spark 1.6 aparentemente ya no necesitaremos jugar con estos valores, spark los determinará automáticamente.
  • Similar a la fracción de memoria anterior pero aleatoria . Si su trabajo no necesita mucha memoria aleatoria, configúrelo en un valor más bajo (esto podría ocasionar que sus mezclas se viertan en el disco, lo que puede tener un impacto catastrófico en la velocidad). A veces, cuando se trata de una operación aleatoria que es OOM, debe hacer lo contrario, es decir, configurarlo en algo grande, como 0.8, o asegurarse de permitir que sus barajaduras se derramen en el disco (es el valor predeterminado desde 1.0.0).
  • Tenga cuidado con las pérdidas de memoria , que a menudo son causadas por el cierre accidental de objetos que no necesita en sus lambdas. La forma de diagnosticar es buscar la "tarea serializada como XXX bytes" en los registros, si XXX es mayor que unos pocos k o más que un MB, puede tener una pérdida de memoria. Ver https://stackoverflow.com/a/25270600/1586965
  • Relacionado con lo anterior; use variables de difusión si realmente necesita objetos grandes.
  • Si está almacenando en caché RDD grandes y puede sacrificar algo de tiempo de acceso, considere serializar el RDD http://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage . O incluso almacenarlos en caché en el disco (que a veces no es tan malo si usa SSD).
  • ( Avanzado ) Relacionado con lo anterior, evite Stringestructuras muy anidadas ( Mapclases de casos similares y anidadas). Si es posible, intente usar solo tipos primitivos e indexe todos los no primitivos, especialmente si espera muchos duplicados. Elija WrappedArraysobre estructuras anidadas siempre que sea posible. O incluso despliegue su propia serialización: USTED tendrá la mayor cantidad de información sobre cómo hacer una copia de seguridad eficiente de sus datos en bytes, ¡ USE !
  • (un poco hacky ) Nuevamente al almacenar en caché, considere usar a Datasetpara almacenar en caché su estructura, ya que utilizará una serialización más eficiente. Esto debería considerarse como un truco en comparación con el punto anterior. Construir su conocimiento de dominio en su algoritmo / serialización puede minimizar la memoria / espacio de caché en 100x o 1000x, mientras que todo lo que se Datasetobtendrá es 2x - 5x en memoria y 10x comprimido (parquet) en disco.

http://spark.apache.org/docs/1.2.1/configuration.html

EDITAR: (para que pueda googlearme más fácilmente) Lo siguiente también es indicativo de este problema:

java.lang.OutOfMemoryError : GC overhead limit exceeded
samthebest
fuente
Gracias por sus sugerencias ~ Si configuro spark.executor.memory = 6g, spark tendrá el problema: "compruebe la IU de su clúster para asegurarse de que los trabajadores estén registrados y tengan suficiente memoria". Establecer spark.storage.memoryFraction en 0.1 tampoco puede resolver el problema. Tal vez el problema radica en mi código. ¡Gracias!
hequn8128
2
@samthebest Esta es una respuesta fantástica. Realmente aprecio la ayuda de registro para encontrar pérdidas de memoria.
Myles Baker
1
Hola @samthebest, ¿cómo especificaste 8000 particiones? Como estoy usando Spark sql, solo puedo especificar la partición usando spark.sql.shuffle.partitions, el valor predeterminado es 200. valor de partición Tengo 1 TB de datos asimétricos para procesar e involucra consultas grupales por colmena. Por favor guía.
Umesh K
2
Hola @ user449355 por favor, ¿podrías hacer una nueva pregunta? Por miedo a comenzar un largo hilo de comentarios :) Si tiene problemas, es probable que otras personas lo tengan, y una pregunta facilitaría la búsqueda para todos.
samthebest
1
Para su primer punto, @samthebest, no debe usar TODA la memoria spark.executor.memoryporque definitivamente necesita cierta cantidad de memoria para la sobrecarga de E / S. Si lo usa todo, ralentizará su programa. La excepción a esto podría ser Unix, en cuyo caso tiene espacio de intercambio.
Hunle
58

Para agregar un caso de uso a esto que a menudo no se discute, presentaré una solución cuando envíe una Sparksolicitud spark-submiten modo local .

De acuerdo con el gitbook Mastering Apache Spark de Jacek Laskowski :

Puede ejecutar Spark en modo local. En este modo de implementación de JVM simple no distribuido, Spark genera todos los componentes de ejecución (controlador, ejecutor, backend y maestro) en la misma JVM. Este es el único modo en el que se utiliza un controlador para la ejecución.

Por lo tanto, si experimenta OOMerrores con el heap, es suficiente ajustar el en driver-memorylugar del executor-memory.

Aquí hay un ejemplo:

spark-1.6.1/bin/spark-submit
  --class "MyClass"
  --driver-memory 12g
  --master local[*] 
  target/scala-2.10/simple-project_2.10-1.0.jar 
Brian
fuente
Cuánto porcentaje deberíamos considerar para la memoria del controlador en modo autónomo.
Yashwanth Kambala
@Brian, en modo local, ¿la memoria del controlador debe ser mayor que el tamaño de los datos de entrada? ¿Es posible especificar el número de particiones para el conjunto de datos de entrada, por lo que el trabajo de Spark puede tratar con un conjunto de datos mucho más grande que la RAM disponible?
fuyi
19

Debe configurar la memoria offHeap como se muestra a continuación:

val spark = SparkSession
     .builder()
     .master("local[*]")
     .config("spark.executor.memory", "70g")
     .config("spark.driver.memory", "50g")
     .config("spark.memory.offHeap.enabled",true)
     .config("spark.memory.offHeap.size","16g")   
     .appName("sampleCodeForReference")
     .getOrCreate()

Entregue la memoria del controlador y la memoria del ejecutor según la disponibilidad de RAM de su máquina. Puede aumentar el tamaño de offHeap si aún enfrenta el problema OutofMemory .

pavan.vn101
fuente
Añadido offHeap configuración ayudó
kennyut
2
configurar la memoria del controlador en su código no funcionará, lea la documentación de spark para esto: las propiedades de Spark se pueden dividir principalmente en dos tipos: una está relacionada con la implementación, como "spark.driver.memory", "spark.executor.instances", este tipo de propiedades pueden no verse afectadas cuando se configura mediante programación a través de SparkConf en tiempo de ejecución, o el comportamiento depende del administrador de clúster y el modo de implementación que elija, por lo que se sugiere configurar a través del archivo de configuración o las opciones de línea de comando de envío de chispa.
Abdulhafeth Sartawi
1
¡LA MEJOR RESPUESTA! Mi problema fue que Spark no estaba instalado en el nodo maestro, solo usé PySpark para conectarme a HDFS y obtuve el mismo error. Usando configresuelto el problema.
Mikhail_Sam
Acabo de agregar las configuraciones usando el comando spark-submit para solucionar el problema del tamaño del montón. Gracias.
Pritam Sadhukhan
16

Debe aumentar la memoria del controlador. Creo que en su carpeta $ SPARK_HOME / conf debería encontrar el archivo spark-defaults.conf, editarlo y configurarlo en spark.driver.memory 4000mfunción de la memoria de su maestro. Esto es lo que solucionó el problema para mí y todo funciona sin problemas

piel azul
fuente
Cuánto porcentaje de mem se asignará, en forma independiente
Yashwanth Kambala
14

Eche un vistazo a los scripts de inicio; allí se establece un tamaño de almacenamiento dinámico Java, parece que no está configurando esto antes de ejecutar Spark Worker.

# Set SPARK_MEM if it isn't already set since we also use it for this process
SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"

Puede encontrar la documentación para desplegar las escrituras aquí .

Tombart
fuente
Gracias ~ Lo intentaré más tarde. Desde spark ui, muestra que la memoria de cada ejecutor es 4096. Entonces, la configuración se ha habilitado, ¿verdad?
hequn8128
Vi su respuesta mientras enfrento un problema similar ( stackoverflow.com/questions/34762432/… ). Mirar el enlace que proporcionó parece que configurar Xms / Xmx ya no existe, ¿puede decir por qué?
Seffy
start up scriptsDesafortunadamente, el contenido del script vinculado a by ha cambiado. No existen tales opciones a partir del 2019-12-19
David Groomes
7

Sufrí mucho de este problema, utilizamos la asignación dinámica de recursos y pensé que utilizaría los recursos de mi clúster para adaptarse mejor a la aplicación.

Pero la verdad es que la asignación dinámica de recursos no establece la memoria del controlador y la mantiene en su valor predeterminado, que es 1g.

Lo resolví estableciendo spark.driver.memory en un número que se adapte a la memoria de mi controlador (para 32 gb de ram lo configuré en 18 gb)

puede configurarlo usando el comando de envío de chispa de la siguiente manera:

spark-submit --conf spark.driver.memory=18gb ....cont

Nota muy importante, esta propiedad no se tendrá en cuenta si la configura desde el código, de acuerdo con la documentación de spark:

Las propiedades de Spark se pueden dividir principalmente en dos tipos: uno está relacionado con la implementación, como "spark.driver.memory", "spark.executor.instances", este tipo de propiedades pueden no verse afectadas al configurar mediante programación a través de SparkConf en tiempo de ejecución, o el comportamiento depende del administrador de clúster y el modo de implementación que elija, por lo que se sugiere configurar a través del archivo de configuración o las opciones de línea de comando de envío de chispa; otro está relacionado principalmente con el control de tiempo de ejecución de Spark, como "spark.task.maxFailures", este tipo de propiedades se pueden establecer de cualquier manera.

Abdulhafeth Sartawi
fuente
2
Debería usar --conf spark.driver.memory = 18g
merenptah
5

En términos generales, la memoria chispe Executor JVM se puede dividir en dos partes. Memoria de chispa y memoria de usuario. Esto está controlado por la propiedad spark.memory.fraction: el valor está entre 0 y 1. Cuando trabaje con imágenes o realice un procesamiento intensivo de memoria en aplicaciones de chispa, considere disminuirspark.memory.fraction . Esto hará que haya más memoria disponible para el trabajo de su aplicación. La chispa puede derramarse, por lo que seguirá funcionando con menos memoria compartida.

La segunda parte del problema es la división del trabajo. Si es posible, particione sus datos en fragmentos más pequeños. Los datos más pequeños posiblemente necesitan menos memoria. Pero si eso no es posible, estás sacrificando el cómputo por la memoria. Por lo general, un solo ejecutor ejecutará múltiples núcleos. La memoria total de los ejecutores debe ser suficiente para manejar los requisitos de memoria de todas las tareas concurrentes. Si aumentar la memoria del ejecutor no es una opción, puede disminuir los núcleos por ejecutor para que cada tarea tenga más memoria para trabajar. Pruebe con 1 ejecutores de núcleo que tengan la mayor memoria posible que pueda brindar y luego siga aumentando los núcleos hasta encontrar el mejor conteo de núcleos.

Rohit Karlupia
fuente
5

¿Volcó su registro maestro de gc? Entonces encontré un problema similar y encontré que SPARK_DRIVER_MEMORY solo configuró el montón Xmx. El tamaño de almacenamiento dinámico inicial sigue siendo 1G y el tamaño de almacenamiento dinámico nunca escala hasta el almacenamiento dinámico Xmx.

Pasar "--conf" spark.driver.extraJavaOptions = -Xms20g "resuelve mi problema.

ps aux | grep java y verá el siguiente registro: =

24501 30.7 1.7 41782944 2318184 pts / 0 Sl + 18:49 0:33 / usr / java / latest / bin / java -cp / opt / spark / conf /: / opt / spark / jars / * -Xmx30g -Xms20g

Yunzhao Yang
fuente
3

La ubicación para establecer el tamaño del almacenamiento dinámico de memoria (al menos en spark-1.0.0) está en conf / spark-env. Las variables relevantes son SPARK_EXECUTOR_MEMORY& SPARK_DRIVER_MEMORY. Más documentos están en la guía de implementación

Además, no olvide copiar el archivo de configuración a todos los nodos esclavos.

Amnón
fuente
44
¿Cómo sabes cuál ajustar entre SPARK_EXECUTOR_MEMORY& SPARK_DRIVER_MEMORY?
Hunle
13
es decir, ¿qué error le indicaría que aumente SPARK_EXECUTOR_MEMORYy qué error le indicaría que aumente SPARK_DRIVER_MEMORY?
Hunle
2

Tengo pocas sugerencias para el error mencionado anteriormente.

● Compruebe que la memoria del ejecutor asignada como ejecutor podría tener que lidiar con particiones que requieren más memoria de la asignada.

● Intente ver si hay más shuffles en vivo, ya que los shuffles son operaciones costosas, ya que involucran E / S de disco, serialización de datos y E / S de red.

● Usar uniones de difusión

● Evite usar groupByKeys e intente reemplazar con ReduceByKey

● Evite el uso de enormes objetos Java donde sea que se baraje

Unmesha SreeVeni
fuente
Perdón por secuestrar la consulta de otra persona, pero ¿cómo usar reduceByKey sobre groupBy?
Somil Aseeja
1

Según tengo entendido el código proporcionado anteriormente, carga el archivo y realiza la operación de mapeo y lo guarda de nuevo. No hay ninguna operación que requiera barajar. Además, no hay ninguna operación que requiera que los datos se lleven al controlador, por lo tanto, ajustar cualquier cosa relacionada con la reproducción aleatoria o el controlador puede no tener ningún impacto. El controlador tiene problemas cuando hay demasiadas tareas, pero esto fue solo hasta la versión 2.0.2 de spark. Puede haber dos cosas que van mal.

  • Solo hay uno o unos pocos ejecutores. Aumente el número de ejecutores para que puedan asignarse a diferentes esclavos. Si está utilizando hilo necesita cambiar la configuración de los ejecutores numéricos o si está utilizando la chispa independiente, entonces necesita ajustar los núcleos numéricos por ejecutor y la configuración máxima de núcleos chispa. En número de ejecutores independientes = max núcleos / núcleos por ejecutor.
  • El número de particiones son muy pocas o tal vez solo una. Entonces, si esto es bajo, incluso si tenemos múltiples núcleos, múltiples ejecutores, no será de mucha ayuda ya que la paralelización depende del número de particiones. Aumente las particiones haciendo imageBundleRDD.repartition (11)
Shridhar
fuente
0

Establecer estas configuraciones exactas ayudó a resolver el problema.

spark-submit --conf spark.yarn.maxAppAttempts=2 --executor-memory 10g --num-executors 50 --driver-memory 12g
swapnil shashank
fuente