TL; DR
Cómo convertirTask.whenAll(List<Task>)
en RxJava
?
Mi código existente usa Bolts para crear una lista de tareas asincrónicas y espera hasta que todas esas tareas terminen antes de realizar otros pasos. Esencialmente, genera un List<Task>
y devuelve uno Task
que se marca como completado cuando se completan todas las tareas de la lista, como en el ejemplo del sitio de Bolts .
Estoy buscando reemplazar Bolts
con RxJava
y estoy asumiendo que este método de crear una lista de tareas asíncronas (el tamaño no se conoce de antemano) y agruparlas todas en una Observable
es posible, pero no sé cómo.
He intentado buscar en merge
, zip
, concat
etc ... pero no puedo ir a trabajar en la List<Observable>
que estaría acumulando ya que todos parecen estar orientados a trabajar en sólo dos Observables
a la vez si entiendo la documentación correctamente.
Estoy tratando de aprender RxJava
y todavía soy muy nuevo en eso, así que perdóneme si esta es una pregunta obvia o si se explica en los documentos en alguna parte; He intentado buscar. Cualquier ayuda será muy apreciada.
fuente
onErrorResumeNext
, ejemplo:Observable.zip(ob1, ob2........).onErrorResumeNext(Observable.<String>empty())
Puede usarlo
flatMap
en caso de que tenga composición de tareas dinámica. Algo como esto:public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) { return Observable.from(tasks) //execute in parallel .flatMap(task -> task.observeOn(Schedulers.computation())) //wait, until all task are executed //be aware, all your observable should emit onComplemete event //otherwise you will wait forever .toList() //could implement more intelligent logic. eg. check that everything is successful .map(results -> true); }
Otro buen ejemplo de ejecución paralela
Nota: No conozco realmente sus requisitos para el manejo de errores. Por ejemplo, qué hacer si solo falla una tarea. Creo que deberías verificar este escenario.
fuente
zip
notifica la finalización tan pronto como una de las tareas se complete y, por lo tanto, no es aplicable.new Func1<Observable<Boolean>, Observable<Boolean>>()...
y la segunda connew Func1<List<Boolean>, Boolean>()
De las sugerencias propuestas, zip () realidad combina resultados observables entre sí, que pueden ser o no lo que se desea, pero no se hizo en la pregunta. En la pregunta, todo lo que se quería era la ejecución de cada una de las operaciones, ya sea una a una o en paralelo (que no se especificó, pero el ejemplo de Bolts vinculado se trataba de una ejecución paralela). Además, zip () se completará inmediatamente cuando se complete alguno de los observables, por lo que infringe los requisitos.
Para la ejecución paralela de Observables, flatMap () presentado en la otra respuesta está bien, pero merge () sería más sencillo. Tenga en cuenta que la fusión se cerrará en caso de error de cualquiera de los Observables, si prefiere posponer la salida hasta que todos los observables hayan terminado, debería estar mirando mergeDelayError () .
Para uno por uno, creo que se debe usar el método estático Observable.concat () . Su javadoc dice así:
que suena como lo que busca si no quiere una ejecución paralela.
Además, si solo está interesado en completar su tarea, no en devolver valores, probablemente debería buscar Completable en lugar de Observable .
TLDR: para la ejecución de tareas una por una y el evento de finalización cuando se completan, creo que Completable.concat () es el más adecuado. Para la ejecución paralela, Completable.merge () o Completable.mergeDelayError () suena como la solución. El primero se detendrá inmediatamente ante cualquier error en cualquier completable, el segundo los ejecutará todos incluso si uno de ellos tiene un error, y solo entonces informa el error.
fuente
Probablemente miró al
zip
operador que trabaja con 2 Observables.También existe el método estático
Observable.zip
. Tiene un formulario que debería serle útil:Puede consultar el javadoc para obtener más información.
fuente
Con Kotlin
Es importante establecer el tipo de los argumentos de la función o tendrá errores de compilación
El último tipo de argumento cambia con el número de argumento: BiFunction para 2 Function3 para 3 Function4 para 4 ...
fuente
Estoy escribiendo código de cálculo en Kotlin con JavaRx Observables y RxKotlin. Quiero observar una lista de observables para completar y, mientras tanto, darme una actualización con el progreso y el último resultado. Al final, devuelve el mejor resultado de cálculo. Un requisito adicional era ejecutar Observables en paralelo para usar todos mis núcleos de CPU. Terminé con esta solución:
@Volatile var results: MutableList<CalculationResult> = mutableListOf() fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> { return Observable.create { subscriber -> Observable.concatEager(listOfCalculations.map { calculation: Calculation -> doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result }).subscribeBy( onNext = { results.add(it) subscriber.onNext(Pair("A calculation is ready", it)) }, onComplete = { subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) subscriber.onComplete() }, onError = { subscriber.onError(it) } ) } }
fuente
@Volatile
, pero ¿cómo funcionaría esto si varios hilos lo llaman al mismo tiempo? ¿Qué pasaría con los resultados?Tuve un problema similar, necesitaba buscar elementos de búsqueda de la llamada de descanso mientras también integraba sugerencias guardadas de RecentSearchProvider.AUTHORITY y las combinaba en una lista unificada. Estaba tratando de usar la solución @MyDogTom, desafortunadamente no hay Observable.from en RxJava. Después de investigar un poco, obtuve una solución que funcionó para mí.
fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>> { val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0) fetchedItems.add(fetchSearchSuggestions(context,query).toObservable()) fetchedItems.add(getSearchResults(query).toObservable()) return Observable.fromArray(fetchedItems) .flatMapIterable { data->data } .flatMap {task -> task.observeOn(Schedulers.io())} .toList() .map { ArrayList(it) } }
Creé un observable a partir de la matriz de observables que contiene listas de sugerencias y resultados de Internet según la consulta. Después de eso, simplemente repase esas tareas con flatMapIterable y ejecútelas usando flatmap, coloque los resultados en una matriz, que luego se puede recuperar en una vista de reciclaje.
fuente
Si usa Project Reactor, puede usar
Mono.when
.Mono.when(publisher1, publisher2) .map(i-> { System.out.println("everything is done!"); return i; }).block()
fuente