¿Puedes reequilibrar un Spliterator desequilibrado de tamaño desconocido?

12

Quiero usar a Streampara paralelizar el procesamiento de un conjunto heterogéneo de archivos JSON almacenados de forma remota de un número desconocido (el número de archivos no se conoce por adelantado). Los archivos pueden variar ampliamente en tamaño, desde 1 registro JSON por archivo hasta 100,000 registros en algunos otros archivos. Un registro JSON en este caso significa un objeto JSON autónomo representado como una línea en el archivo.

Realmente quiero usar Streams para esto, así que implementé esto Spliterator:

public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {

    abstract protected JsonStreamSupport<METADATA> openInputStream(String path);

    abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);

    private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
    private static final int MAX_BUFFER = 100;
    private final Iterator<String> paths;
    private JsonStreamSupport<METADATA> reader = null;

    public JsonStreamSpliterator(Iterator<String> paths) {
        this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
        super(est, additionalCharacteristics);
        this.paths = paths;
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
        this(est, additionalCharacteristics, paths);
        open(nextPath);
    }

    @Override
    public boolean tryAdvance(Consumer<? super RECORD> action) {
        if(reader == null) {
            String path = takeNextPath();
            if(path != null) {
                open(path);
            }
            else {
                return false;
            }
        }
        Map<String, Object> json = reader.readJsonLine();
        if(json != null) {
            RECORD item = parse(reader.getMetadata(), json);
            action.accept(item);
            return true;
        }
        else {
            reader.close();
            reader = null;
            return tryAdvance(action);
        }
    }

    private void open(String path) {
        reader = openInputStream(path);
    }

    private String takeNextPath() {
        synchronized(paths) {
            if(paths.hasNext()) {
                return paths.next();
            }
        }
        return null;
    }

    @Override
    public Spliterator<RECORD> trySplit() {
        String nextPath = takeNextPath();
        if(nextPath != null) {
            return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
                @Override
                protected JsonStreamSupport<METADATA> openInputStream(String path) {
                    return JsonStreamSpliterator.this.openInputStream(path);
                }
                @Override
                protected RECORD parse(METADATA metaData, Map<String,Object> json) {
                    return JsonStreamSpliterator.this.parse(metaData, json);
                }
            };              
        }
        else {
            List<RECORD> records = new ArrayList<RECORD>();
            while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
                // loop
            }
            if(records.size() != 0) {
                return records.spliterator();
            }
            else {
                return null;
            }
        }
    }
}

El problema que tengo es que, si bien el Stream se paraleliza maravillosamente al principio, eventualmente el archivo más grande se deja procesar en un solo hilo. Creo que la causa proximal está bien documentada: el spliterator está "desequilibrado".

Más concretamente, parece que el trySplitmétodo no se llama después de cierto punto en el Stream.forEachciclo de vida del mismo, por lo que la lógica adicional para distribuir lotes pequeños al final trySplitrara vez se ejecuta.

Observe cómo todos los spliteradores devueltos por trySplit comparten el mismo pathsiterador. Pensé que esta era una forma realmente inteligente de equilibrar el trabajo en todos los spliteradores, pero no ha sido suficiente para lograr un paralelismo completo.

Me gustaría que el procesamiento paralelo proceda primero a través de los archivos, y luego, cuando todavía quedan pocos archivos grandes divididos, quiero paralelizar a través de fragmentos de los archivos restantes. Esa fue la intención del elsebloque al final de trySplit.

¿Hay alguna manera fácil / simple / canónica de solucionar este problema?

Alex R
fuente
2
Necesita una estimación de tamaño. Puede ser totalmente falso, siempre que refleje aproximadamente la proporción de su división desequilibrada. De lo contrario, la transmisión no sabe que las divisiones están desequilibradas y se detendrán una vez que se haya creado un cierto número de fragmentos.
Holger
@Holger, ¿puede explicar "se detendrá una vez que se haya creado un cierto número de fragmentos" o señalarme la fuente de JDK para esto? ¿Cuál es el número de trozos donde se detiene?
Alex R
El código es irrelevante, ya que mostraría demasiados detalles de implementación irrelevantes, que podrían cambiar en cualquier momento. El punto relevante es que la implementación intenta dividir la llamada con la frecuencia suficiente, de modo que cada subproceso de trabajo (ajustado al número de núcleos de CPU) tiene algo que hacer. Para compensar las diferencias impredecibles en el tiempo de cómputo, es probable que produzca incluso más fragmentos que hilos de trabajo para permitir el robo de trabajo y usar los tamaños estimados como heurísticos (por ejemplo, para decidir qué sub-spliterator dividir más). Ver también stackoverflow.com/a/48174508/2711488
Holger
Hice algunos experimentos para tratar de entender tu comentario. Las heurísticas parecen ser bastante primitivas. Parece que el regreso Long.MAX_VALUEcausa una división excesiva e innecesaria, mientras que cualquier estimación que no sea la Long.MAX_VALUEcausa de la división adicional se detiene, matando el paralelismo. Devolver una combinación de estimaciones precisas no parece llevar a ninguna optimización inteligente.
Alex R
No estoy afirmando que la estrategia de implementación fue muy inteligente, pero al menos funciona para algunos escenarios con tamaños estimados (de lo contrario, hubo muchos más informes de errores al respecto). Parece que hubo algunos errores de su parte durante los experimentos. Por ejemplo, en el código de su pregunta, está ampliando AbstractSpliteratorpero anulando, lo trySplit()cual es un mal combo para otra cosa que no sea Long.MAX_VALUE, ya que no está adaptando el tamaño estimado trySplit(). Después trySplit(), la estimación del tamaño debe reducirse por el número de elementos que se han dividido.
Holger

Respuestas:

0

Su trySplitdebe escisiones de salida de igual tamaño, independientemente del tamaño de los archivos subyacentes. Debe tratar todos los archivos como una sola unidad y llenar el ArrayListspliterator respaldado con el mismo número de objetos JSON cada vez. El número de objetos debe ser tal que procesar una división demore entre 1 y 10 milisegundos: menos de 1 ms y comienza a acercarse a los costos de entregar el lote a un subproceso de trabajo, más alto que eso y comienza a arriesgar una carga de CPU desigual debido a tareas que son demasiado gruesas.

El spliterator no está obligado a informar una estimación de tamaño, y ya lo está haciendo correctamente: su estimación es Long.MAX_VALUE, que es un valor especial que significa "ilimitado". Sin embargo, si tiene muchos archivos con un solo objeto JSON, lo que resulta en lotes de tamaño 1, esto perjudicará su rendimiento de dos maneras: la sobrecarga de abrir, leer y cerrar el archivo puede convertirse en un cuello de botella y, si logra escapar eso, el costo de la transferencia de hilos puede ser significativo en comparación con el costo de procesar un artículo, lo que nuevamente causa un cuello de botella.

Hace cinco años estaba resolviendo un problema similar, puedes echar un vistazo a mi solución .

Marko Topolnik
fuente
Sí, "no está obligado a informar una estimación de tamaño" y Long.MAX_VALUEestá describiendo correctamente un tamaño desconocido, pero eso no ayuda cuando la implementación real de Stream funciona mal en ese momento. Incluso usando el resultado de ThreadLocalRandom.current().nextInt(100, 100_000)un tamaño estimado produce mejores resultados.
Holger
Funcionó bien para mis casos de uso, donde el costo computacional de cada artículo fue sustancial. Estaba alcanzando fácilmente el 98% del uso total de CPU y el rendimiento escalado casi linealmente con paralelismo. Básicamente, es importante obtener el tamaño de lote correcto para que el procesamiento tarde entre 1 y 10 milisegundos. Eso está muy por encima de los costos de transferencia de subprocesos y no es demasiado largo para causar problemas de granularidad de tareas. He publicado resultados de referencia al final de esta publicación .
Marko Topolnik
Su solución se divide ArraySpliteratory tiene un tamaño estimado (incluso un tamaño exacto). Por lo tanto, la implementación de Stream verá el tamaño de la matriz vs Long.MAX_VALUE, considere esto desequilibrado y divida el spliterator "más grande" (ignorando que Long.MAX_VALUEsignifica "desconocido"), hasta que no pueda dividirse más. Luego, si no hay suficientes fragmentos, dividirá los spliteradores basados ​​en matrices utilizando sus tamaños conocidos. Sí, esto funciona muy bien, pero no contradice mi afirmación de que necesita una estimación de tamaño, independientemente de lo pobre que sea.
Holger
OK, entonces parece ser un malentendido --- porque no necesita una estimación de tamaño en la entrada. Solo en las divisiones individuales, y siempre puedes tener eso.
Marko Topolnik
Bueno, mi primer comentario fue " Necesita una estimación de tamaño. Puede ser totalmente falso, siempre que refleje aproximadamente la proporción de su división desequilibrada " . El punto clave aquí fue que el código del OP crea otro spliterator que contiene un solo elemento, pero Todavía informa un tamaño desconocido. Esto es lo que hace que la implementación de Stream sea inútil. Cualquier número estimado para el nuevo spliterator sería significativamente menor que el que Long.MAX_VALUEsería.
Holger
0

Después de mucha experimentación, todavía no pude obtener ningún paralelismo adicional al jugar con las estimaciones de tamaño. Básicamente, cualquier valor que Long.MAX_VALUEno sea , tenderá a hacer que el spliterator finalice demasiado pronto (y sin división), mientras que, por otro lado, una Long.MAX_VALUEestimación hará trySplitque se llame implacablemente hasta que regrese null.

La solución que encontré es compartir internamente los recursos entre los separadores y dejar que se reequilibren entre ellos.

Código de trabajo:

public class AwsS3LineSpliterator<LINE> extends AbstractSpliterator<AwsS3LineInput<LINE>> {

    public final static class AwsS3LineInput<LINE> {
        final public S3ObjectSummary s3ObjectSummary;
        final public LINE lineItem;
        public AwsS3LineInput(S3ObjectSummary s3ObjectSummary, LINE lineItem) {
            this.s3ObjectSummary = s3ObjectSummary;
            this.lineItem = lineItem;
        }
    }

    private final class InputStreamHandler {
        final S3ObjectSummary file;
        final InputStream inputStream;
        InputStreamHandler(S3ObjectSummary file, InputStream is) {
            this.file = file;
            this.inputStream = is;
        }
    }

    private final Iterator<S3ObjectSummary> incomingFiles;

    private final Function<S3ObjectSummary, InputStream> fileOpener;

    private final Function<InputStream, LINE> lineReader;

    private final Deque<S3ObjectSummary> unopenedFiles;

    private final Deque<InputStreamHandler> openedFiles;

    private final Deque<AwsS3LineInput<LINE>> sharedBuffer;

    private final int maxBuffer;

    private AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener,
            Function<InputStream, LINE> lineReader,
            Deque<S3ObjectSummary> unopenedFiles, Deque<InputStreamHandler> openedFiles, Deque<AwsS3LineInput<LINE>> sharedBuffer,
            int maxBuffer) {
        super(Long.MAX_VALUE, 0);
        this.incomingFiles = incomingFiles;
        this.fileOpener = fileOpener;
        this.lineReader = lineReader;
        this.unopenedFiles = unopenedFiles;
        this.openedFiles = openedFiles;
        this.sharedBuffer = sharedBuffer;
        this.maxBuffer = maxBuffer;
    }

    public AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener, Function<InputStream, LINE> lineReader, int maxBuffer) {
        this(incomingFiles, fileOpener, lineReader, new ConcurrentLinkedDeque<>(), new ConcurrentLinkedDeque<>(), new ArrayDeque<>(maxBuffer), maxBuffer);
    }

    @Override
    public boolean tryAdvance(Consumer<? super AwsS3LineInput<LINE>> action) {
        AwsS3LineInput<LINE> lineInput;
        synchronized(sharedBuffer) {
            lineInput=sharedBuffer.poll();
        }
        if(lineInput != null) {
            action.accept(lineInput);
            return true;
        }
        InputStreamHandler handle = openedFiles.poll();
        if(handle == null) {
            S3ObjectSummary unopenedFile = unopenedFiles.poll();
            if(unopenedFile == null) {
                return false;
            }
            handle = new InputStreamHandler(unopenedFile, fileOpener.apply(unopenedFile));
        }
        for(int i=0; i < maxBuffer; ++i) {
            LINE line = lineReader.apply(handle.inputStream);
            if(line != null) {
                synchronized(sharedBuffer) {
                    sharedBuffer.add(new AwsS3LineInput<LINE>(handle.file, line));
                }
            }
            else {
                return tryAdvance(action);
            }
        }
        openedFiles.addFirst(handle);
        return tryAdvance(action);
    }

    @Override
    public Spliterator<AwsS3LineInput<LINE>> trySplit() {
        synchronized(incomingFiles) {
            if (incomingFiles.hasNext()) {
                unopenedFiles.add(incomingFiles.next());
                return new AwsS3LineSpliterator<LINE>(incomingFiles, fileOpener, lineReader, unopenedFiles, openedFiles, sharedBuffer, maxBuffer);
            } else {
                return null;
            }
        }
    }
}
Alex R
fuente