Hay una manera, pero no te gustará. El siguiente método transforma a Future<T>
en a CompletableFuture<T>
:
public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
if (future.isDone())
return transformDoneFuture(future);
return CompletableFuture.supplyAsync(() -> {
try {
if (!future.isDone())
awaitFutureIsDoneInForkJoinPool(future);
return future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
}
private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
CompletableFuture<T> cf = new CompletableFuture<>();
T result;
try {
result = future.get();
} catch (Throwable ex) {
cf.completeExceptionally(ex);
return cf;
}
cf.complete(result);
return cf;
}
private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
throws InterruptedException {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override public boolean block() throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return true;
}
@Override public boolean isReleasable() {
return future.isDone();
}
});
}
Obviamente, el problema con este enfoque es que para cada Futuro , se bloqueará un hilo para esperar el resultado del Futuro, contradiciendo la idea de futuros. En algunos casos, podría ser mejor hacerlo. Sin embargo, en general, no hay solución sin esperar activamente el resultado del Futuro .
CompletableFuture.supplyAsync(supplier, new SinglethreadExecutor())
al menos no bloquearía los subprocesos comunes.Si la biblioteca que desea utilizar también ofrece un método de estilo de devolución de llamada además del estilo Future, puede proporcionarle un controlador que complete CompletableFuture sin ningún bloqueo de subproceso adicional. Al igual que:
AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file")); // ... CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>(); open.read(buffer, position, null, new CompletionHandler<Integer, Void>() { @Override public void completed(Integer result, Void attachment) { completableFuture.complete(buffer); } @Override public void failed(Throwable exc, Void attachment) { completableFuture.completeExceptionally(exc); } }); completableFuture.thenApply(...)
Sin la devolución de llamada, la única otra forma que veo para resolver esto es usar un bucle de sondeo que coloca todas sus
Future.isDone()
comprobaciones en un solo hilo y luego invoca complete cada vez que se pueda obtener un Future.fuente
Si tu
Future
es el resultado de una llamada a unExecutorService
método (por ejemplosubmit()
), lo más fácil sería usar elCompletableFuture.runAsync(Runnable, Executor)
método en su lugar.Desde
a
A
CompletableFuture
continuación, se crea "de forma nativa".EDITAR: Continuando con los comentarios de @SamMefford corregidos por @MartinAndersson, si desea pasar a
Callable
, debe llamarsupplyAsync()
, convirtiendo elCallable<T>
en aSupplier<T>
, por ejemplo, con:CompletableFuture.supplyAsync(() -> { try { return myCallable.call(); } catch (Exception ex) { throw new RuntimeException(ex); } // Or return default value }, myExecutor);
Debido a que
T Callable.call() throws Exception;
lanza una excepción yT Supplier.get();
no lo hace, debe detectar la excepción para que los prototipos sean compatibles.fuente
CompletableFuture<T> future = CompletableFuture.supplyAsync(myCallable, myExecutor);
supplyAsync
recibe unSupplier
. El código no se compilará si intenta pasar unCallable
.Callable<T>
en unSupplier<T>
.Publiqué un pequeño proyecto de futuro que intenta hacer algo mejor que el camino directo en la respuesta.
La idea principal es usar el único hilo (y, por supuesto, no solo con un bucle giratorio) para verificar todos los estados de Futures en el interior, lo que ayuda a evitar bloquear un hilo de un grupo para cada transformación Future -> CompletableFuture.
Ejemplo de uso:
fuente
Sugerencia:
http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/
Pero, básicamente:
public class CompletablePromiseContext { private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor(); public static void schedule(Runnable r) { SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS); } }
Y, la Promesa Completable:
public class CompletablePromise<V> extends CompletableFuture<V> { private Future<V> future; public CompletablePromise(Future<V> future) { this.future = future; CompletablePromiseContext.schedule(this::tryToComplete); } private void tryToComplete() { if (future.isDone()) { try { complete(future.get()); } catch (InterruptedException e) { completeExceptionally(e); } catch (ExecutionException e) { completeExceptionally(e.getCause()); } return; } if (future.isCancelled()) { cancel(true); return; } CompletablePromiseContext.schedule(this::tryToComplete); } }
Ejemplo:
public class Main { public static void main(String[] args) { final ExecutorService service = Executors.newSingleThreadExecutor(); final Future<String> stringFuture = service.submit(() -> "success"); final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture); completableFuture.whenComplete((result, failure) -> { System.out.println(result); }); } }
fuente
CompletablePromiseContext
no estático y tomaría el parámetro para el intervalo de verificación (que se establece en 1 ms aquí) y luego sobrecargaría elCompletablePromise<V>
constructor para poder proporcionar el suyoCompletablePromiseContext
con un intervalo de verificación posiblemente diferente (más largo) para una ejecución prolongadaFuture<V>
donde no No es necesario que pueda ejecutar la devolución de llamada (o redactar) inmediatamente después de terminar, y también puede tener una instancia deCompletablePromiseContext
para ver un conjunto deFuture
(en caso de que tenga muchos)Permítanme sugerirles otra opción (con suerte, mejor): https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata /concurrente
Brevemente, la idea es la siguiente:
CompletableTask<V>
interfaz: la unión delCompletionStage<V>
+RunnableFuture<V>
ExecutorService
para volverCompletableTask
desubmit(...)
métodos (en lugar deFuture<V>
)La implementación utiliza una implementación alternativa de CompletionStage (preste atención, CompletionStage en lugar de CompletableFuture):
Uso:
J8ExecutorService exec = J8Executors.newCachedThreadPool(); CompletionStage<String> = exec .submit( someCallableA ) .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b) .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c);
fuente