Comprimir secuencias usando JDK8 con lambda (java.util.stream.Streams.zip)

149

En JDK 8 con lambda b93 había una clase java.util.stream.Streams.zip en b93 que podría usarse para comprimir secuencias (esto se ilustra en el tutorial Explorando Java8 Lambdas. Parte 1 por Dhananjay Nene ). Esta función :

Crea una secuencia combinada lenta y secuencial cuyos elementos son el resultado de combinar los elementos de dos secuencias.

Sin embargo, en b98 esto ha desaparecido. De hecho, la Streamsclase ni siquiera es accesible en java.util.stream en b98 .

¿Se ha movido esta funcionalidad, y si es así, cómo comprimo las transmisiones de manera concisa usando b98?

La aplicación que tengo en mente está en esta implementación Java de Shen , donde reemplacé la funcionalidad zip en el

  • static <T> boolean every(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)
  • static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)

funciona con un código bastante detallado (que no utiliza la funcionalidad de b98).

artella
fuente
3
Ah, acabo de descubrir que parece que se ha eliminado por completo: mail.openjdk.java.net/pipermail/lambda-libs-spec-observers/…
artella
"Explorando las Lambdas de Java8. Parte 1" - el nuevo enlace para este artículo es blog.dhananjaynene.com/2013/02/exploring-java8-lambdas-part-1
Aleksei Egorov

Respuestas:

77

También necesitaba esto, así que tomé el código fuente de b93 y lo puse en una clase "util". Tuve que modificarlo ligeramente para trabajar con la API actual.

Como referencia, aquí está el código de trabajo (tómelo bajo su propio riesgo ...):

public static<A, B, C> Stream<C> zip(Stream<? extends A> a,
                                     Stream<? extends B> b,
                                     BiFunction<? super A, ? super B, ? extends C> zipper) {
    Objects.requireNonNull(zipper);
    Spliterator<? extends A> aSpliterator = Objects.requireNonNull(a).spliterator();
    Spliterator<? extends B> bSpliterator = Objects.requireNonNull(b).spliterator();

    // Zipping looses DISTINCT and SORTED characteristics
    int characteristics = aSpliterator.characteristics() & bSpliterator.characteristics() &
            ~(Spliterator.DISTINCT | Spliterator.SORTED);

    long zipSize = ((characteristics & Spliterator.SIZED) != 0)
            ? Math.min(aSpliterator.getExactSizeIfKnown(), bSpliterator.getExactSizeIfKnown())
            : -1;

    Iterator<A> aIterator = Spliterators.iterator(aSpliterator);
    Iterator<B> bIterator = Spliterators.iterator(bSpliterator);
    Iterator<C> cIterator = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return aIterator.hasNext() && bIterator.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(aIterator.next(), bIterator.next());
        }
    };

    Spliterator<C> split = Spliterators.spliterator(cIterator, zipSize, characteristics);
    return (a.isParallel() || b.isParallel())
           ? StreamSupport.stream(split, true)
           : StreamSupport.stream(split, false);
}
siki
fuente
1
¿No debería ser la secuencia resultante SIZEDsi cualquiera de las secuencias es SIZED, no ambas?
Didier L
55
No lo creo. Ambas transmisiones tienen que ser SIZEDpara que esta implementación funcione. De hecho, depende de cómo se defina la compresión. ¿Debería poder comprimir dos transmisiones que son de diferente tamaño, por ejemplo? ¿Cómo se vería la secuencia resultante entonces? Creo que esta es la razón por la cual esta función se omitió de la API. Hay muchas maneras de hacer esto y depende del usuario decidir qué comportamiento debe ser el "correcto". ¿Descartarías los elementos de la secuencia más larga o rellenarías la lista más corta? Si es así, ¿con qué valor (es)?
siki
A menos que me falte algo, no hay necesidad de ningún elenco (por ejemplo, a Spliterator<A>).
jub0bs
¿Hay un sitio web donde esté alojado el código fuente Java 8 b93? Tengo problemas para encontrarlo.
Starwarswii
42

zip es una de las funciones proporcionadas por la biblioteca protonpack .

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");

List<String> zipped = StreamUtils.zip(streamA,
                                      streamB,
                                      (a, b) -> a + " is for " + b)
                                 .collect(Collectors.toList());

assertThat(zipped,
           contains("A is for Apple", "B is for Banana", "C is for Carrot"));
Dominic Fox
fuente
1
también encontrado en StreamEx: amaembo.github.io/streamex/javadoc/one/util/streamex/…
tokland
34

Si tiene Guava en su proyecto, puede usar el método Streams.zip (se agregó en Guava 21):

Devuelve una secuencia en la que cada elemento es el resultado de pasar el elemento correspondiente de cada una de streamA y streamB a funcionar. La secuencia resultante solo será tan larga como la más corta de las dos secuencias de entrada; Si una secuencia es más larga, se ignorarán sus elementos adicionales. La secuencia resultante no es eficientemente divisible. Esto puede dañar el rendimiento paralelo.

 public class Streams {
     ...

     public static <A, B, R> Stream<R> zip(Stream<A> streamA,
             Stream<B> streamB, BiFunction<? super A, ? super B, R> function) {
         ...
     }
 }
ZhekaKozlov
fuente
26

Comprimir dos corrientes utilizando JDK8 con lambda ( GIST ).

public static <A, B, C> Stream<C> zip(Stream<A> streamA, Stream<B> streamB, BiFunction<A, B, C> zipper) {
    final Iterator<A> iteratorA = streamA.iterator();
    final Iterator<B> iteratorB = streamB.iterator();
    final Iterator<C> iteratorC = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return iteratorA.hasNext() && iteratorB.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(iteratorA.next(), iteratorB.next());
        }
    };
    final boolean parallel = streamA.isParallel() || streamB.isParallel();
    return iteratorToFiniteStream(iteratorC, parallel);
}

public static <T> Stream<T> iteratorToFiniteStream(Iterator<T> iterator, boolean parallel) {
    final Iterable<T> iterable = () -> iterator;
    return StreamSupport.stream(iterable.spliterator(), parallel);
}
Karol Król
fuente
2
¡Buena solución y (relativamente) compacta! Requiere que coloque import java.util.function.*;y import java.util.stream.*;en la parte superior de su archivo.
sffc
Tenga en cuenta que esta es una operación de terminal en la secuencia. Esto significa que para transmisiones infinitas, este método se descompone
smac89
2
Tanto envoltorios inútiles: Aquí () -> iteratory aquí de nuevo: iterable.spliterator(). ¿Por qué no implementar directamente un en Spliteratorlugar de un Iterator? Compruebe @Doradus respuesta stackoverflow.com/a/46230233/1140754
Miguel Gamboa
20

Como no puedo concebir el uso de comprimir en colecciones que no sean las indexadas (Listas) y soy un gran admirador de la simplicidad, esta sería mi solución:

<A,B,C>  Stream<C> zipped(List<A> lista, List<B> listb, BiFunction<A,B,C> zipper){
     int shortestLength = Math.min(lista.size(),listb.size());
     return IntStream.range(0,shortestLength).mapToObj( i -> {
          return zipper.apply(lista.get(i), listb.get(i));
     });        
}
Rafael
fuente
1
Creo que mapToObjectdebería serlo mapToObj.
seanf
si la lista no lo es RandomAccess(por ejemplo, en las listas vinculadas) esto será muy lento
avmohan
Seguro. Pero la mayoría de los desarrolladores de Java son conscientes de que LinkedList tiene un bajo rendimiento para las operaciones de acceso al índice.
Rafael
11

Los métodos de la clase que mencionó se han trasladado a la Streaminterfaz misma en favor de los métodos predeterminados. Pero parece que el zipmétodo ha sido eliminado. Tal vez porque no está claro cuál debería ser el comportamiento predeterminado para secuencias de diferentes tamaños. Pero implementar el comportamiento deseado es sencillo:

static <T> boolean every(
  Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().allMatch(x->!it.hasNext()||pred.test(x, it.next()));
}
static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().filter(x->it.hasNext()&&pred.test(x, it.next()))
      .findFirst().orElse(null);
}
Holger
fuente
¿No predicatehas pasado al filtro con estado ? Eso viola el contrato del método y, especialmente, no funcionará al procesar la transmisión en paralelo.
Andreas
2
@Andreas: ninguna de las soluciones aquí admite el procesamiento paralelo. Como mis métodos no devuelven una secuencia, se aseguran de que las secuencias no se ejecuten en paralelo. Del mismo modo, el código de la respuesta aceptada devuelve una secuencia que se puede convertir en paralelo pero que en realidad no hará nada en paralelo. Dicho esto, los predicados con estado se desaconsejan pero no violan el contrato. Incluso podrían usarse en contexto paralelo si se asegura de que la actualización de estado sea segura para subprocesos. En algunas situaciones son inevitables, por ejemplo, convertir un flujo en distinto es un predicado completo en sí mismo .
Holger
2
@Andreas: puedes adivinar por qué estas operaciones se han eliminado de la API de Java ...
Holger
8

Humildemente sugiero esta implementación. La secuencia resultante se trunca a la más corta de las dos secuencias de entrada.

public static <L, R, T> Stream<T> zip(Stream<L> leftStream, Stream<R> rightStream, BiFunction<L, R, T> combiner) {
    Spliterator<L> lefts = leftStream.spliterator();
    Spliterator<R> rights = rightStream.spliterator();
    return StreamSupport.stream(new AbstractSpliterator<T>(Long.min(lefts.estimateSize(), rights.estimateSize()), lefts.characteristics() & rights.characteristics()) {
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            return lefts.tryAdvance(left->rights.tryAdvance(right->action.accept(combiner.apply(left, right))));
        }
    }, leftStream.isParallel() || rightStream.isParallel());
}
Doradus
fuente
Me gusta tu propuesta Pero no estoy totalmente de acuerdo con el último .., leftStream.isParallel() || rightStream.isParallel(). Creo que no tiene ningún efecto porque AbstractSpliteratorofrece un paralelismo limitado por defecto. Así que creo que el resultado final será lo mismo que pasar false.
Miguel Gamboa
@MiguelGamboa: gracias por tu comentario. No estoy seguro de lo que quiere decir con "paralelismo limitado por defecto": ¿tiene un enlace a algunos documentos?
Doradus
6

La biblioteca Lazy-Seq proporciona funcionalidad zip.

https://github.com/nurkiewicz/LazySeq

Esta biblioteca está muy inspirada scala.collection.immutable.Streamy tiene como objetivo proporcionar una implementación de secuencia perezosa inmutable, segura para subprocesos y fácil de usar, posiblemente infinita.

Nick Siderakis
fuente
5

Usando la última biblioteca de guayaba (para la Streamsclase) deberías poder hacer

final Map<String, String> result = 
    Streams.zip(
        collection1.stream(), 
        collection2.stream(), 
        AbstractMap.SimpleEntry::new)
    .collect(Collectors.toMap(e -> e.getKey(), e  -> e.getValue()));
Dan Borza
fuente
2

Que este trabajo para usted? Es una función corta, que evalúa perezosamente las secuencias que está comprimiendo, por lo que puede suministrarle secuencias infinitas (no necesita tomar el tamaño de las secuencias que se están comprimiendo).

Si las secuencias son finitas, se detiene tan pronto como una de las secuencias se quede sin elementos.

import java.util.Objects;
import java.util.function.BiFunction;
import java.util.stream.Stream;

class StreamUtils {
    static <ARG1, ARG2, RESULT> Stream<RESULT> zip(
            Stream<ARG1> s1,
            Stream<ARG2> s2,
            BiFunction<ARG1, ARG2, RESULT> combiner) {
        final var i2 = s2.iterator();
        return s1.map(x1 -> i2.hasNext() ? combiner.apply(x1, i2.next()) : null)
                .takeWhile(Objects::nonNull);
    }
}

Aquí hay un código de prueba de unidad (¡mucho más largo que el código mismo!)

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;

class StreamUtilsTest {
    @ParameterizedTest
    @MethodSource("shouldZipTestCases")
    <ARG1, ARG2, RESULT>
    void shouldZip(
            String testName,
            Stream<ARG1> s1,
            Stream<ARG2> s2,
            BiFunction<ARG1, ARG2, RESULT> combiner,
            Stream<RESULT> expected) {
        var actual = StreamUtils.zip(s1, s2, combiner);

        assertEquals(
                expected.collect(Collectors.toList()),
                actual.collect(Collectors.toList()),
                testName);
    }

    private static Stream<Arguments> shouldZipTestCases() {
        return Stream.of(
                Arguments.of(
                        "Two empty streams",
                        Stream.empty(),
                        Stream.empty(),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "One singleton and one empty stream",
                        Stream.of(1),
                        Stream.empty(),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "One empty and one singleton stream",
                        Stream.empty(),
                        Stream.of(1),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "Two singleton streams",
                        Stream.of("blah"),
                        Stream.of(1),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("blah", 1))),
                Arguments.of(
                        "One singleton, one multiple stream",
                        Stream.of("blob"),
                        Stream.of(2, 3),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("blob", 2))),
                Arguments.of(
                        "One multiple, one singleton stream",
                        Stream.of("foo", "bar"),
                        Stream.of(4),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("foo", 4))),
                Arguments.of(
                        "Two multiple streams",
                        Stream.of("nine", "eleven"),
                        Stream.of(10, 12),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("nine", 10), pair("eleven", 12)))
        );
    }

    private static List<Object> pair(Object o1, Object o2) {
        return List.of(o1, o2);
    }

    static private <T1, T2> List<Object> combine(T1 o1, T2 o2) {
        return List.of(o1, o2);
    }

    @Test
    void shouldLazilyEvaluateInZip() {
        final var a = new AtomicInteger();
        final var b = new AtomicInteger();
        final var zipped = StreamUtils.zip(
                Stream.generate(a::incrementAndGet),
                Stream.generate(b::decrementAndGet),
                (xa, xb) -> xb + 3 * xa);

        assertEquals(0, a.get(), "Should not have evaluated a at start");
        assertEquals(0, b.get(), "Should not have evaluated b at start");

        final var takeTwo = zipped.limit(2);

        assertEquals(0, a.get(), "Should not have evaluated a at take");
        assertEquals(0, b.get(), "Should not have evaluated b at take");

        final var list = takeTwo.collect(Collectors.toList());

        assertEquals(2, a.get(), "Should have evaluated a after collect");
        assertEquals(-2, b.get(), "Should have evaluated b after collect");
        assertEquals(List.of(2, 4), list);
    }
}
dominic
fuente
Tuve que soltar takeWhileal final que no parece estar en Java8, pero no es un problema, ya que la persona que llama puede filtrar los valores nulos que ocurren cuando las secuencias comprimidas no son del mismo tamaño. Creo que esta respuesta debería ser la respuesta número 1, ya que es consistente y comprensible. buen trabajo gracias de nuevo.
simbo1905
1
public class Tuple<S,T> {
    private final S object1;
    private final T object2;

    public Tuple(S object1, T object2) {
        this.object1 = object1;
        this.object2 = object2;
    }

    public S getObject1() {
        return object1;
    }

    public T getObject2() {
        return object2;
    }
}


public class StreamUtils {

    private StreamUtils() {
    }

    public static <T> Stream<Tuple<Integer,T>> zipWithIndex(Stream<T> stream) {
        Stream<Integer> integerStream = IntStream.range(0, Integer.MAX_VALUE).boxed();
        Iterator<Integer> integerIterator = integerStream.iterator();
        return stream.map(x -> new Tuple<>(integerIterator.next(), x));
    }
}
robby_pelssers
fuente
1

Cyclops-react de AOL , a lo que contribuyo, también proporciona funcionalidad de compresión, tanto a través de una implementación extendida de Stream , que también implementa la interfaz Reactive-streams ReactiveSeq, como a través de StreamUtils que ofrece gran parte de la misma funcionalidad a través de métodos estáticos a Java Streams estándar.

 List<Tuple2<Integer,Integer>> list =  ReactiveSeq.of(1,2,3,4,5,6)
                                                  .zip(Stream.of(100,200,300,400));


  List<Tuple2<Integer,Integer>> list = StreamUtils.zip(Stream.of(1,2,3,4,5,6),
                                                  Stream.of(100,200,300,400));

También ofrece una compresión basada en aplicaciones más generalizada. P.ej

   ReactiveSeq.of("a","b","c")
              .ap3(this::concat)
              .ap(of("1","2","3"))
              .ap(of(".","?","!"))
              .toList();

   //List("a1.","b2?","c3!");

   private String concat(String a, String b, String c){
    return a+b+c;
   }

E incluso la capacidad de emparejar cada elemento en una secuencia con cada elemento en otro

   ReactiveSeq.of("a","b","c")
              .forEach2(str->Stream.of(str+"!","2"), a->b->a+"_"+b);

   //ReactiveSeq("a_a!","a_2","b_b!","b_2","c_c!","c2")
John McClean
fuente
0

Si alguien necesita esto todavía, hay una StreamEx.zipWithfunción en la biblioteca streamex :

StreamEx<String> givenNames = StreamEx.of("Leo", "Fyodor")
StreamEx<String> familyNames = StreamEx.of("Tolstoy", "Dostoevsky")
StreamEx<String> fullNames = givenNames.zipWith(familyNames, (gn, fn) -> gn + " " + fn);

fullNames.forEach(System.out::println);  // prints: "Leo Tolstoy\nFyodor Dostoevsky\n"
const.grigoryev
fuente
-1

Esto es genial. Tuve que comprimir dos secuencias en un mapa, siendo una secuencia la clave y otra el valor

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");    
final Stream<Map.Entry<String, String>> s = StreamUtils.zip(streamA,
                    streamB,
                    (a, b) -> {
                        final Map.Entry<String, String> entry = new AbstractMap.SimpleEntry<String, String>(a, b);
                        return entry;
                    });

System.out.println(s.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));

Salida: {A = Apple, B = Banana, C = Zanahoria}

Gnana
fuente