He estado mirando nuevo rx java 2 y no estoy muy seguro de entender la idea de backpressuremás ...
Soy consciente de que tenemos Observableque no tiene backpressureapoyo y Flowableque lo tiene.
Entonces, basado en el ejemplo, digamos que tengo flowablecon interval:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Esto va a fallar después de alrededor de 128 valores, y eso es bastante obvio que estoy consumiendo más lento que obteniendo elementos.
Pero luego tenemos lo mismo con Observable
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Esto no se bloqueará en absoluto, incluso cuando demore un poco el consumo, todavía funciona. Para que Flowablefuncione, digamos que puse onBackpressureDropoperador, el bloqueo se ha ido, pero tampoco se emiten todos los valores.
Entonces, la pregunta base que no puedo encontrar respuesta actualmente en mi cabeza es ¿por qué debería preocuparme backpressurecuando puedo usar simple Observabletodavía recibir todos los valores sin administrar el buffer? O tal vez desde el otro lado, ¿qué ventajas me backpressureda a favor de gestionar y manejar el consumo?

Respuestas:
Lo que la contrapresión se manifiesta en la práctica son búferes limitados,
Flowable.observeOntiene un búfer de 128 elementos que se drena tan rápido como la corriente descendente puede tomarlo. Puede aumentar este tamaño de búfer individualmente para manejar la fuente en ráfagas y todas las prácticas de manejo de contrapresión aún se aplican desde 1.x.Observable.observeOntiene un búfer ilimitado que sigue recopilando los elementos y su aplicación puede quedarse sin memoria.Puede utilizar,
Observablepor ejemplo:Puede utilizar,
Flowablepor ejemplo:fuente
Maybe,SingleyCompletablepuede siempre ser utilizado en lugar deFlowablecuando se apropian semánticamente?Maybe,Single, yCompletableson mucho demasiado pequeña para tener ninguna necesidad del concepto de contrapresión. No hay posibilidad de que un productor pueda emitir artículos más rápido de lo que se pueden consumir, ya que se producirán o consumirán de 0 a 1 artículos.La contrapresión es cuando su observable (editor) está creando más eventos de los que su suscriptor puede manejar. Por lo tanto, puede obtener suscriptores que faltan eventos, o puede obtener una enorme cola de eventos que eventualmente conduce a la falta de memoria.
Flowabletoma en consideración la contrapresión.Observableno. Eso es.me recuerda a un embudo que cuando tiene demasiado líquido se desborda. Flowable puede ayudar a que eso no suceda:
con tremenda contrapresión:
pero con el uso de fluido, hay mucha menos contrapresión:
Rxjava2 tiene algunas estrategias de contrapresión que puede usar según su caso de uso. por estrategia me refiero a Rxjava2 proporciona una forma de manejar los objetos que no se pueden procesar debido al desbordamiento (contrapresión).
aquí están las estrategias. No los revisaré todos, pero, por ejemplo, si no quiere preocuparse por los elementos que se desbordan, puede usar una estrategia de caída como esta:
observable.toFlowable (BackpressureStrategy.DROP)
Hasta donde yo sé, debería haber un límite de 128 elementos en la cola, después de eso puede haber un desbordamiento (contrapresión). Incluso si no es 128, está cerca de ese número. Espero que esto ayude a alguien.
si necesita cambiar el tamaño del búfer de 128, parece que se puede hacer así (pero observe las limitaciones de memoria:
En el desarrollo de software, por lo general, la estrategia de contrapresión significa que le dice al emisor que disminuya un poco la velocidad, ya que el consumidor no puede manejar la velocidad de sus eventos de emisión.
fuente
El hecho de que
Flowablese bloquee después de emitir 128 valores sin manejo de contrapresión no significa que siempre se bloqueará después de exactamente 128 valores: a veces se bloqueará después de 10 y, a veces, no se bloqueará en absoluto. Creo que esto es lo que sucedió cuando probó el ejemplo conObservable: no hubo contrapresión, por lo que su código funcionó normalmente, la próxima vez puede que no. La diferencia en RxJava 2 es que ya no existe el concepto de contrapresión enObservables, y no hay forma de manejarlo. Si está diseñando una secuencia reactiva que probablemente requerirá un manejo de contrapresión explícito, entoncesFlowablees su mejor opción.fuente
intervalsinbackpressure, ¿esperaría algún comportamiento o problemas extraños?