Scala: Lista [Futuro] a [Lista] futuro sin tener en cuenta los futuros fallidos

116

Estoy buscando una manera de convertir una lista de futuros de longitud arbitraria en una lista de futuros. Estoy usando Playframework, así que en última instancia, lo que realmente quiero es un Future[Result], pero para simplificar las cosas, digamos que la Future[List[Int]]forma normal de hacer esto sería usar Future.sequence(...)pero hay un giro ... La lista que me dan generalmente tiene alrededor de 10-20 futuros en él, y no es raro que uno de esos futuros falle (están realizando solicitudes de servicios web externos). En lugar de tener que volver a intentarlo todos en caso de que uno de ellos falle, me gustaría poder acceder a los que tuvieron éxito y devolverlos.

Por ejemplo, hacer lo siguiente no funciona

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Success
import scala.util.Failure

val listOfFutures = Future.successful(1) :: Future.failed(new Exception("Failure")) :: 
                    Future.successful(3) :: Nil

val futureOfList = Future.sequence(listOfFutures)

futureOfList onComplete {
  case Success(x) => println("Success!!! " + x)
  case Failure(ex) => println("Failed !!! " + ex)
}

scala> Failed !!! java.lang.Exception: Failure

En lugar de obtener la única excepción, me gustaría poder sacar el 1 y el 3 de allí. Intenté usar Future.fold, pero aparentemente solo llama Future.sequencedetrás de escena.

¡Gracias de antemano por la ayuda!

Joe
fuente

Respuestas:

146

El truco consiste en asegurarse primero de que ninguno de los futuros haya fallado. .recoveres su amigo aquí, puede combinarlo con mappara convertir todos los Future[T]resultados en Future[Try[T]]]instancias, todas las cuales seguramente serán futuros exitosos.

nota: puede usar Optiono Eithertambién aquí, pero Tryes la forma más limpia si desea específicamente atrapar excepciones

def futureToFutureTry[T](f: Future[T]): Future[Try[T]] =
  f.map(Success(_)).recover { case x => Failure(x)}

val listOfFutures = ...
val listOfFutureTrys = listOfFutures.map(futureToFutureTry(_))

Luego úselo Future.sequencecomo antes, para darle unFuture[List[Try[T]]]

val futureListOfTrys = Future.sequence(listOfFutureTrys)

Luego filtrar:

val futureListOfSuccesses = futureListOfTrys.map(_.filter(_.isSuccess))

Incluso puede extraer las fallas específicas, si las necesita:

val futureListOfFailures = futureListOfTrys.map(_.filter(_.isFailure))
Kevin Wright
fuente
¡Gracias! .recoverera de hecho la pieza que faltaba para mí.
Joe
20
Puede utilizar en _.collect{ case Success(x) => x}lugar de _.filter(_.isSuccess)deshacerse de Tryen tipo de futureListOfSuccesses.
senia
43
En scala 2010 .recover(x => Failure(x))no es válido, use .recover({case e => Failure(e)})en su lugar
FGRibreau
Creo que te estás perdiendo el envoltorio futuro: def futureToFutureOfTry [A] (f: Future [A]): ​​Future [Try [A]] = {val p = Promise [Try [A]] () f.map {a => p.success (scala.util.Success (a))} .recover {case x: Throwable => p.success (Failure (x))} p.future}
Dario
no tan. Estoy mapeando un futuro a otro futuro, una Promesa intermedia no es necesaria y sería un desperdicio
Kevin Wright
12

Scala 2.12 tiene una mejora Future.transformque se presta en una respuesta con menos códigos.

val futures = Seq(Future{1},Future{throw new Exception})

// instead of `map` and `recover`, use `transform`
val seq = Future.sequence(futures.map(_.transform(Success(_)))) 

val successes = seq.map(_.collect{case Success(x)=>x})
successes
//res1: Future[Seq[Int]] = Future(Success(List(1)))

val failures = seq.map(_.collect{case Failure(x)=>x})
failures
//res2: Future[Seq[Throwable]] = Future(Success(List(java.lang.Exception)))
WeiChing 林 煒 清
fuente
11

Probé la respuesta de Kevin y encontré un problema técnico en mi versión de Scala (2.11.5) ... Lo corrigí y escribí algunas pruebas adicionales si alguien está interesado ... aquí está mi versión>

implicit class FutureCompanionOps(val f: Future.type) extends AnyVal {

    /** Given a list of futures `fs`, returns the future holding the list of Try's of the futures from `fs`.
      * The returned future is completed only once all of the futures in `fs` have been completed.
      */
    def allAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
      val listOfFutureTrys: List[Future[Try[T]]] = fItems.map(futureToFutureTry)
      Future.sequence(listOfFutureTrys)
    }

    def futureToFutureTry[T](f: Future[T]): Future[Try[T]] = {
      f.map(Success(_)) .recover({case x => Failure(x)})
    }

    def allFailedAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
      allAsTrys(fItems).map(_.filter(_.isFailure))
    }

    def allSucceededAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
      allAsTrys(fItems).map(_.filter(_.isSuccess))
    }
}


// Tests... 



  // allAsTrys tests
  //
  test("futureToFutureTry returns Success if no exception") {
    val future =  Future.futureToFutureTry(Future{"mouse"})
    Thread.sleep(0, 100)
    val futureValue = future.value
    assert(futureValue == Some(Success(Success("mouse"))))
  }
  test("futureToFutureTry returns Failure if exception thrown") {
    val future =  Future.futureToFutureTry(Future{throw new IllegalStateException("bad news")})
    Thread.sleep(5)            // need to sleep a LOT longer to get Exception from failure case... interesting.....
    val futureValue = future.value

    assertResult(true) {
      futureValue match {
        case Some(Success(Failure(error: IllegalStateException)))  => true
      }
    }
  }
  test("Future.allAsTrys returns Nil given Nil list as input") {
    val future =  Future.allAsTrys(Nil)
    assert ( Await.result(future, 100 nanosecond).isEmpty )
  }
  test("Future.allAsTrys returns successful item even if preceded by failing item") {
    val future1 =  Future{throw new IllegalStateException("bad news")}
    var future2 = Future{"dog"}

    val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys, 10 milli)
    System.out.println("successItem:" + listOfTrys);

    assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
    assert(listOfTrys(1) == Success("dog"))
  }
  test("Future.allAsTrys returns successful item even if followed by failing item") {
    var future1 = Future{"dog"}
    val future2 =  Future{throw new IllegalStateException("bad news")}

    val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
    System.out.println("successItem:" + listOfTrys);

    assert(listOfTrys(1).failed.get.getMessage.contains("bad news"))
    assert(listOfTrys(0) == Success("dog"))
  }
  test("Future.allFailedAsTrys returns the failed item and only that item") {
    var future1 = Future{"dog"}
    val future2 =  Future{throw new IllegalStateException("bad news")}

    val futureListOfTrys =  Future.allFailedAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
    assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
    assert(listOfTrys.size == 1)
  }
  test("Future.allSucceededAsTrys returns the succeeded item and only that item") {
    var future1 = Future{"dog"}
    val future2 =  Future{throw new IllegalStateException("bad news")}

    val futureListOfTrys =  Future.allSucceededAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
    assert(listOfTrys(0) == Success("dog"))
    assert(listOfTrys.size == 1)
  }
Chris Bedford
fuente
7

Acabo de encontrarme con esta pregunta y tengo otra solución que ofrecer:

def allSuccessful[A, M[X] <: TraversableOnce[X]](in: M[Future[A]])
                                                (implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], 
                                                 executor: ExecutionContext): Future[M[A]] = {
    in.foldLeft(Future.successful(cbf(in))) {
      (fr, fa)(for (r ← fr; a ← fa) yield r += a) fallbackTo fr
    } map (_.result())
}

La idea aquí es que dentro del pliegue está esperando que se complete el siguiente elemento de la lista (usando la sintaxis de comprensión) y si el siguiente falla, simplemente recurra a lo que ya tiene.

Idan Waisman
fuente
No me gusta el nombre pero me gusta la forma en que está hecho, directamente de la secuencia
impl
1

Puede envolver fácilmente el resultado futuro con la opción y luego aplanar la lista:

def futureToFutureOption[T](f: Future[T]): Future[Option[T]] =
    f.map(Some(_)).recover {
      case e => None
    }
val listOfFutureOptions = listOfFutures.map(futureToFutureOption(_))

val futureListOfOptions = Future.sequence(listOfFutureOptions)

val futureListOfSuccesses = futureListOfOptions.flatten
Amir Hossein Javan
fuente
En caso de que alguien más encuentre un error con Some en la primera función, la primera función se puede reescribir así para evitar errores del compilador: def futureToFutureOption [T] (f: Future [T]): Future [Option [T]] = f.map (Option (_)). recovery {case e => None}
Zee
0

También puede recopilar resultados exitosos y no exitosos en diferentes listas:

def safeSequence[A](futures: List[Future[A]]): Future[(List[Throwable], List[A])] = {
  futures.foldLeft(Future.successful((List.empty[Throwable], List.empty[A]))) { (flist, future) =>
    flist.flatMap { case (elist, alist) =>
      future
        .map { success => (elist, alist :+ success) }
        .recover { case error: Throwable => (elist :+ error, alist) }
    }
  }
}
Evgeniy Lyutikov
fuente