Supongamos que tengo varios futuros y necesito esperar hasta que alguno de ellos falle o todos tengan éxito.
Por ejemplo: Let hay 3 futuros: f1
, f2
, f3
.
Si
f1
tiene éxito yf2
falla, no esperof3
(y devuelvo la falla al cliente).Si
f2
falla mientrasf1
yf3
siguen funcionando no los espero (y devuelvo falla )Si
f1
tiene éxito y luego lof2
logra, sigo esperandof3
.
¿Cómo lo implementaría?
scala
concurrency
future
Miguel
fuente
fuente
Respuestas:
En su lugar, podría usar una para la comprensión de la siguiente manera:
val fut1 = Future{...} val fut2 = Future{...} val fut3 = Future{...} val aggFut = for{ f1Result <- fut1 f2Result <- fut2 f3Result <- fut3 } yield (f1Result, f2Result, f3Result)
En este ejemplo, los futuros 1, 2 y 3 se inician en paralelo. Luego, en el modo de comprensión, esperamos hasta que estén disponibles los resultados 1 y luego 2 y luego 3. Si falla 1 o 2, no esperaremos más a 3. Si los 3 tienen éxito, entonces
aggFut
val mantendrá una tupla con 3 ranuras, correspondiente a los resultados de los 3 futuros.Ahora, si necesita el comportamiento en el que desea dejar de esperar si dice que fut2 falla primero, las cosas se ponen un poco más complicadas. En el ejemplo anterior, tendría que esperar a que se complete fut1 antes de darse cuenta de que fut2 falló. Para resolver eso, puedes probar algo como esto:
val fut1 = Future{Thread.sleep(3000);1} val fut2 = Promise.failed(new RuntimeException("boo")).future val fut3 = Future{Thread.sleep(1000);3} def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = { val fut = if (futures.size == 1) futures.head._2 else Future.firstCompletedOf(futures.values) fut onComplete{ case Success(value) if (futures.size == 1)=> prom.success(value :: values) case Success(value) => processFutures(futures - value, value :: values, prom) case Failure(ex) => prom.failure(ex) } prom.future } val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]()) aggFut onComplete{ case value => println(value) }
Ahora, esto funciona correctamente, pero el problema proviene de saber cuál
Future
eliminarMap
cuando se ha completado correctamente. Siempre que tenga alguna forma de correlacionar adecuadamente un resultado con el Futuro que generó ese resultado, algo como esto funciona. Simplemente sigue eliminando de forma recursiva los futuros completados del mapa y luego llamandoFuture.firstCompletedOf
al restoFutures
hasta que no quede ninguno, recopilando los resultados a lo largo del camino. No es bonito, pero si realmente necesitas el comportamiento del que estás hablando, entonces esto o algo similar podría funcionar.fuente
fut2
falla antesfut1
? ¿Seguiremos esperandofut1
en ese caso? Si lo hacemos, no es exactamente lo que quiero.onFailure
controlador parafut2
fallar rápido, y unaonSuccess
en laaggFut
que el éxito mango. Un éxito en lasaggFut
implicaciones sefut2
completó correctamente, por lo que solo se ha llamado a uno de los controladores.Puede usar una promesa y enviarle el primer error o el éxito agregado final completado:
def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = { val p = Promise[M[A]]() // the first Future to fail completes the promise in.foreach(_.onFailure{case i => p.tryFailure(i)}) // if the whole sequence succeeds (i.e. no failures) // then the promise is completed with the aggregated success Future.sequence(in).foreach(p trySuccess _) p.future }
Luego, puede hacerlo en
Await
el resultadoFuture
si desea bloquear, o simplementemap
en otra cosa.La diferencia con para la comprensión es que aquí obtiene el error del primero en fallar, mientras que con para la comprensión obtiene el primer error en el orden transversal de la colección de entrada (incluso si otro falló primero). Por ejemplo:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)} // this waits one second, then prints "java.lang.ArithmeticException: / by zero" // the first to fail in traversal order
Y:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)} // this immediately prints "java.util.NoSuchElementException: None.get" // the 'actual' first to fail (usually...) // and it returns early (it does not wait 1 sec)
fuente
Aquí hay una solución sin utilizar actores.
import scala.util._ import scala.concurrent._ import java.util.concurrent.atomic.AtomicInteger // Nondeterministic. // If any failure, return it immediately, else return the final success. def allSucceed[T](fs: Future[T]*): Future[T] = { val remaining = new AtomicInteger(fs.length) val p = promise[T] fs foreach { _ onComplete { case s @ Success(_) => { if (remaining.decrementAndGet() == 0) { // Arbitrarily return the final success p tryComplete s } } case f @ Failure(_) => { p tryComplete f } } } p.future }
fuente
Puede hacer esto solo con futuros. Aquí hay una implementación. Tenga en cuenta que no terminará la ejecución antes de tiempo. En ese caso, debe hacer algo más sofisticado (y probablemente implementar la interrupción usted mismo). Pero si simplemente no quiere seguir esperando algo que no va a funcionar, la clave es seguir esperando que termine lo primero y detenerse cuando no quede nada o haga una excepción:
import scala.annotation.tailrec import scala.util.{Try, Success, Failure} import scala.concurrent._ import scala.concurrent.duration.Duration import ExecutionContext.Implicits.global @tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): Either[Throwable, Seq[A]] = { val first = Future.firstCompletedOf(fs) Await.ready(first, Duration.Inf).value match { case None => awaitSuccess(fs, done) // Shouldn't happen! case Some(Failure(e)) => Left(e) case Some(Success(_)) => val (complete, running) = fs.partition(_.isCompleted) val answers = complete.flatMap(_.value) answers.find(_.isFailure) match { case Some(Failure(e)) => Left(e) case _ => if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done) else Right( answers.map(_.get) ++: done ) } } }
Aquí hay un ejemplo en acción cuando todo funciona bien:
scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); println("Fancy meeting you here!") }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! Fancy meeting you here! Bye! res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))
Pero cuando algo sale mal:
scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); throw new Exception("boo"); () }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo) scala> Bye!
fuente
Para este propósito, usaría un actor Akka. A diferencia de la comprensión, falla tan pronto como falla cualquiera de los futuros, por lo que es un poco más eficiente en ese sentido.
class ResultCombiner(futs: Future[_]*) extends Actor { var origSender: ActorRef = null var futsRemaining: Set[Future[_]] = futs.toSet override def receive = { case () => origSender = sender for(f <- futs) f.onComplete(result => self ! if(result.isSuccess) f else false) case false => origSender ! SomethingFailed case f: Future[_] => futsRemaining -= f if(futsRemaining.isEmpty) origSender ! EverythingSucceeded } } sealed trait Result case object SomethingFailed extends Result case object EverythingSucceeded extends Result
Luego, cree el actor, envíele un mensaje (para que sepa dónde enviar su respuesta) y espere una respuesta.
val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3))) try { val f4: Future[Result] = actor ? () implicit val timeout = new Timeout(30 seconds) // or whatever Await.result(f4, timeout.duration).asInstanceOf[Result] match { case SomethingFailed => println("Oh noes!") case EverythingSucceeded => println("It all worked!") } } finally { // Avoid memory leaks: destroy the actor actor ! PoisonPill }
fuente
Esta pregunta ha sido respondida pero estoy publicando mi solución de clase de valor (las clases de valor se agregaron en 2.10) ya que no hay una aquí. Siéntete libre de criticar.
implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal { def concurrently = ConcurrentFuture(self) } case class ConcurrentFuture[A](future: Future[A]) extends AnyVal { def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future)) def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class } def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = { val p = Promise[B]() val inner = f(outer.future) inner.future onFailure { case t => p.tryFailure(t) } outer.future onFailure { case t => p.tryFailure(t) } inner.future onSuccess { case b => p.trySuccess(b) } ConcurrentFuture(p.future) }
ConcurrentFuture es un contenedor de Future sin gastos generales que cambia el mapa Future / flatMap predeterminado de hacer-esto-luego-aquello-combinar-todo-y-fallar-si-alguno-falla. Uso:
def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 } def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" } def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 } val f : Future[(Int,String,Double)] = { for { f1 <- func1.concurrently f2 <- func2.concurrently f3 <- func3.concurrently } yield for { v1 <- f1 v2 <- f2 v3 <- f3 } yield (v1,v2,v3) }.future f.onFailure { case t => println("future failed $t") }
En el ejemplo anterior, f1, f2 y f3 se ejecutarán simultáneamente y si alguno falla en cualquier orden, el futuro de la tupla fallará inmediatamente.
fuente
Es posible que desee consultar la API futura de Twitter. En particular, el método Future.collect. Hace exactamente lo que quieres: https://twitter.github.io/scala_school/finagle.html
El código fuente Future.scala está disponible aquí: https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala
fuente
Puedes usar esto:
val l = List(1, 6, 8) val f = l.map{ i => future { println("future " +i) Thread.sleep(i* 1000) if (i == 12) throw new Exception("6 is not legal.") i } } val f1 = Future.sequence(f) f1 onSuccess{ case l => { logInfo("onSuccess") l.foreach(i => { logInfo("h : " + i) }) } } f1 onFailure{ case l => { logInfo("onFailure") }
fuente