Quiero usar a Stream
para paralelizar el procesamiento de un conjunto heterogéneo de archivos JSON almacenados de forma remota de un número desconocido (el número de archivos no se conoce por adelantado). Los archivos pueden variar ampliamente en tamaño, desde 1 registro JSON por archivo hasta 100,000 registros en algunos otros archivos. Un registro JSON en este caso significa un objeto JSON autónomo representado como una línea en el archivo.
Realmente quiero usar Streams para esto, así que implementé esto Spliterator
:
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
El problema que tengo es que, si bien el Stream se paraleliza maravillosamente al principio, eventualmente el archivo más grande se deja procesar en un solo hilo. Creo que la causa proximal está bien documentada: el spliterator está "desequilibrado".
Más concretamente, parece que el trySplit
método no se llama después de cierto punto en el Stream.forEach
ciclo de vida del mismo, por lo que la lógica adicional para distribuir lotes pequeños al final trySplit
rara vez se ejecuta.
Observe cómo todos los spliteradores devueltos por trySplit comparten el mismo paths
iterador. Pensé que esta era una forma realmente inteligente de equilibrar el trabajo en todos los spliteradores, pero no ha sido suficiente para lograr un paralelismo completo.
Me gustaría que el procesamiento paralelo proceda primero a través de los archivos, y luego, cuando todavía quedan pocos archivos grandes divididos, quiero paralelizar a través de fragmentos de los archivos restantes. Esa fue la intención del else
bloque al final de trySplit
.
¿Hay alguna manera fácil / simple / canónica de solucionar este problema?
fuente
Long.MAX_VALUE
causa una división excesiva e innecesaria, mientras que cualquier estimación que no sea laLong.MAX_VALUE
causa de la división adicional se detiene, matando el paralelismo. Devolver una combinación de estimaciones precisas no parece llevar a ninguna optimización inteligente.AbstractSpliterator
pero anulando, lotrySplit()
cual es un mal combo para otra cosa que no seaLong.MAX_VALUE
, ya que no está adaptando el tamaño estimadotrySplit()
. DespuéstrySplit()
, la estimación del tamaño debe reducirse por el número de elementos que se han dividido.Respuestas:
Su
trySplit
debe escisiones de salida de igual tamaño, independientemente del tamaño de los archivos subyacentes. Debe tratar todos los archivos como una sola unidad y llenar elArrayList
spliterator respaldado con el mismo número de objetos JSON cada vez. El número de objetos debe ser tal que procesar una división demore entre 1 y 10 milisegundos: menos de 1 ms y comienza a acercarse a los costos de entregar el lote a un subproceso de trabajo, más alto que eso y comienza a arriesgar una carga de CPU desigual debido a tareas que son demasiado gruesas.El spliterator no está obligado a informar una estimación de tamaño, y ya lo está haciendo correctamente: su estimación es
Long.MAX_VALUE
, que es un valor especial que significa "ilimitado". Sin embargo, si tiene muchos archivos con un solo objeto JSON, lo que resulta en lotes de tamaño 1, esto perjudicará su rendimiento de dos maneras: la sobrecarga de abrir, leer y cerrar el archivo puede convertirse en un cuello de botella y, si logra escapar eso, el costo de la transferencia de hilos puede ser significativo en comparación con el costo de procesar un artículo, lo que nuevamente causa un cuello de botella.Hace cinco años estaba resolviendo un problema similar, puedes echar un vistazo a mi solución .
fuente
Long.MAX_VALUE
está describiendo correctamente un tamaño desconocido, pero eso no ayuda cuando la implementación real de Stream funciona mal en ese momento. Incluso usando el resultado deThreadLocalRandom.current().nextInt(100, 100_000)
un tamaño estimado produce mejores resultados.ArraySpliterator
y tiene un tamaño estimado (incluso un tamaño exacto). Por lo tanto, la implementación de Stream verá el tamaño de la matriz vsLong.MAX_VALUE
, considere esto desequilibrado y divida el spliterator "más grande" (ignorando queLong.MAX_VALUE
significa "desconocido"), hasta que no pueda dividirse más. Luego, si no hay suficientes fragmentos, dividirá los spliteradores basados en matrices utilizando sus tamaños conocidos. Sí, esto funciona muy bien, pero no contradice mi afirmación de que necesita una estimación de tamaño, independientemente de lo pobre que sea.Long.MAX_VALUE
sería.Después de mucha experimentación, todavía no pude obtener ningún paralelismo adicional al jugar con las estimaciones de tamaño. Básicamente, cualquier valor que
Long.MAX_VALUE
no sea , tenderá a hacer que el spliterator finalice demasiado pronto (y sin división), mientras que, por otro lado, unaLong.MAX_VALUE
estimación harátrySplit
que se llame implacablemente hasta que regresenull
.La solución que encontré es compartir internamente los recursos entre los separadores y dejar que se reequilibren entre ellos.
Código de trabajo:
fuente