Uso de spark 2.4.4 que se ejecuta en modo de clúster YARN con el programador FIFO de spark.
Estoy enviando múltiples operaciones de trama de datos de chispa (es decir, escribiendo datos en S3) usando un ejecutor de grupo de subprocesos con un número variable de subprocesos. Esto funciona bien si tengo ~ 10 hilos, pero si uso cientos de hilos, parece que hay un punto muerto, sin trabajos programados de acuerdo con la interfaz de usuario de Spark.
¿Qué factores controlan cuántos trabajos se pueden programar simultáneamente? ¿Recursos del controlador (por ejemplo, memoria / núcleos)? ¿Alguna otra configuración de chispa?
EDITAR:
Aquí hay una breve sinopsis de mi código
ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(pool);
Dataset<Row> aHugeDf = spark.read.json(hundredsOfPaths);
List<Future<Void>> futures = listOfSeveralHundredThings
.stream()
.map(aThing -> ecs.submit(() -> {
df
.filter(col("some_column").equalTo(aThing))
.write()
.format("org.apache.hudi")
.options(writeOptions)
.save(outputPathFor(aThing));
return null;
}))
.collect(Collectors.toList());
IntStream.range(0, futures.size()).forEach(i -> ecs.poll(30, TimeUnit.MINUTES));
exec.shutdownNow();
En algún momento, a medida que nThreads
aumenta, la chispa ya no parece estar programando ningún trabajo como lo demuestra:
ecs.poll(...)
tiempo de espera eventualmente- La pestaña de trabajos de Spark UI que no muestra trabajos activos
- La pestaña Ejecutores de Spark UI que no muestra tareas activas para ningún ejecutor
- La pestaña Spark UI SQL que muestra
nThreads
consultas en ejecución sin ID de trabajo en ejecución
Mi entorno de ejecución es
- AWS EMR 5.28.1
- Spark 2.4.4
- Nodo maestro =
m5.4xlarge
- Nodos centrales = 3x
rd5.24xlarge
spark.driver.cores=24
spark.driver.memory=32g
spark.executor.memory=21g
spark.scheduler.mode=FIFO
fuente
jstack -l
para obtener un volcado de subprocesos con información de bloqueo.Respuestas:
Si es posible, escriba la salida de los trabajos en AWS Elastic MapReduce hdfs (para aprovechar los cambios de nombre casi instantáneos y un mejor archivo IO de los hdf locales) y agregue un paso dstcp para mover los archivos a S3, para ahorrarse todos los problemas de manejar el entrañas de una tienda de objetos tratando de ser un sistema de archivos. También escribir en archivos hdf locales le permitirá habilitar la especulación para controlar las tareas fuera de control sin caer en las trampas de bloqueo asociadas con DirectOutputCommiter.
Si debe usar S3 como el directorio de salida, asegúrese de que estén configuradas las siguientes configuraciones de Spark
Nota: DirectParquetOutputCommitter se elimina de Spark 2.0 debido a la posibilidad de pérdida de datos. Desafortunadamente, hasta que hayamos mejorado la consistencia de S3a, tenemos que trabajar con las soluciones. Las cosas están mejorando con Hadoop 2.8
Evite los nombres clave en orden lexicográfico. Uno podría usar prefijos aleatorios / aleatorios o invertir fecha y hora para moverse. El truco consiste en nombrar las claves jerárquicamente, colocando las cosas más comunes por las que filtra en el lado izquierdo de la clave. Y nunca tenga guiones bajos en los nombres de los depósitos debido a problemas de DNS.
Habilitar
fs.s3a.fast.upload upload
partes de un solo archivo en Amazon S3 en paraleloConsulte estos artículos para obtener más detalles.
Configuración de spark.speculation en Spark 2.1.0 al escribir en s3
https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98
fuente
En mi opinión, es probable que estés abordando este problema mal A menos que pueda garantizar que el número de tareas por trabajo sea muy bajo, es probable que no obtenga muchas mejoras de rendimiento al paralelizar cientos de trabajos a la vez. Su clúster solo puede admitir 300 tareas a la vez, suponiendo que esté utilizando el paralelismo predeterminado de 200, es decir, solo 1.5 trabajos. Sugeriría reescribir su código para limitar el máximo de consultas simultáneas en 10. Sospecho que tiene 300 consultas con una sola tarea de varios cientos en ejecución. La mayoría de los sistemas de procesamiento de datos OLTP tienen intencionalmente un nivel bastante bajo de consultas concurrentes en comparación con los sistemas RDS más tradicionales por este motivo.
además
fuente