He estado mirando nuevo rx java 2 y no estoy muy seguro de entender la idea de backpressure
más ...
Soy consciente de que tenemos Observable
que no tiene backpressure
apoyo y Flowable
que lo tiene.
Entonces, basado en el ejemplo, digamos que tengo flowable
con interval
:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Esto va a fallar después de alrededor de 128 valores, y eso es bastante obvio que estoy consumiendo más lento que obteniendo elementos.
Pero luego tenemos lo mismo con Observable
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Esto no se bloqueará en absoluto, incluso cuando demore un poco el consumo, todavía funciona. Para que Flowable
funcione, digamos que puse onBackpressureDrop
operador, el bloqueo se ha ido, pero tampoco se emiten todos los valores.
Entonces, la pregunta base que no puedo encontrar respuesta actualmente en mi cabeza es ¿por qué debería preocuparme backpressure
cuando puedo usar simple Observable
todavía recibir todos los valores sin administrar el buffer
? O tal vez desde el otro lado, ¿qué ventajas me backpressure
da a favor de gestionar y manejar el consumo?
Respuestas:
Lo que la contrapresión se manifiesta en la práctica son búferes limitados,
Flowable.observeOn
tiene un búfer de 128 elementos que se drena tan rápido como la corriente descendente puede tomarlo. Puede aumentar este tamaño de búfer individualmente para manejar la fuente en ráfagas y todas las prácticas de manejo de contrapresión aún se aplican desde 1.x.Observable.observeOn
tiene un búfer ilimitado que sigue recopilando los elementos y su aplicación puede quedarse sin memoria.Puede utilizar,
Observable
por ejemplo:Puede utilizar,
Flowable
por ejemplo:fuente
Maybe
,Single
yCompletable
puede siempre ser utilizado en lugar deFlowable
cuando se apropian semánticamente?Maybe
,Single
, yCompletable
son mucho demasiado pequeña para tener ninguna necesidad del concepto de contrapresión. No hay posibilidad de que un productor pueda emitir artículos más rápido de lo que se pueden consumir, ya que se producirán o consumirán de 0 a 1 artículos.La contrapresión es cuando su observable (editor) está creando más eventos de los que su suscriptor puede manejar. Por lo tanto, puede obtener suscriptores que faltan eventos, o puede obtener una enorme cola de eventos que eventualmente conduce a la falta de memoria.
Flowable
toma en consideración la contrapresión.Observable
no. Eso es.me recuerda a un embudo que cuando tiene demasiado líquido se desborda. Flowable puede ayudar a que eso no suceda:
con tremenda contrapresión:
pero con el uso de fluido, hay mucha menos contrapresión:
Rxjava2 tiene algunas estrategias de contrapresión que puede usar según su caso de uso. por estrategia me refiero a Rxjava2 proporciona una forma de manejar los objetos que no se pueden procesar debido al desbordamiento (contrapresión).
aquí están las estrategias. No los revisaré todos, pero, por ejemplo, si no quiere preocuparse por los elementos que se desbordan, puede usar una estrategia de caída como esta:
observable.toFlowable (BackpressureStrategy.DROP)
Hasta donde yo sé, debería haber un límite de 128 elementos en la cola, después de eso puede haber un desbordamiento (contrapresión). Incluso si no es 128, está cerca de ese número. Espero que esto ayude a alguien.
si necesita cambiar el tamaño del búfer de 128, parece que se puede hacer así (pero observe las limitaciones de memoria:
En el desarrollo de software, por lo general, la estrategia de contrapresión significa que le dice al emisor que disminuya un poco la velocidad, ya que el consumidor no puede manejar la velocidad de sus eventos de emisión.
fuente
El hecho de que
Flowable
se bloquee después de emitir 128 valores sin manejo de contrapresión no significa que siempre se bloqueará después de exactamente 128 valores: a veces se bloqueará después de 10 y, a veces, no se bloqueará en absoluto. Creo que esto es lo que sucedió cuando probó el ejemplo conObservable
: no hubo contrapresión, por lo que su código funcionó normalmente, la próxima vez puede que no. La diferencia en RxJava 2 es que ya no existe el concepto de contrapresión enObservable
s, y no hay forma de manejarlo. Si está diseñando una secuencia reactiva que probablemente requerirá un manejo de contrapresión explícito, entoncesFlowable
es su mejor opción.fuente
interval
sinbackpressure
, ¿esperaría algún comportamiento o problemas extraños?