¿Cómo configurar un grupo de subprocesos afinados para futuros?

80

¿Qué tan grande es el grupo de subprocesos de Scala para futuros?

Mi aplicación Scala genera muchos millones de future {}sy me pregunto si hay algo que pueda hacer para optimizarlos configurando un grupo de subprocesos.

Gracias.

Erik Kaplun
fuente
Slick 3.0 utiliza su propia conexión y grupo de subprocesos, entonces, ¿por qué necesitamos proporcionar un contexto de ejecución implícito para pulir cuando administra su propio grupo de subprocesos?
Rahul Gulabani
1
@RahulGulabani, del libro "essential slick":The reason is that map, flatMap methods of Action allows you to call arbitrary code when joining the actions together. Slick cannot allow that code to be run on its own execution context, because it has no way to know if you are going to tie up Slicks threads for a long time.
srzhio

Respuestas:

90

Puede especificar su propio ExecutionContext en el que se ejecutarán sus futuros, en lugar de importar el ExecutionContext implícito global.

import java.util.concurrent.Executors
import scala.concurrent._

implicit val ec = new ExecutionContext {
    val threadPool = Executors.newFixedThreadPool(1000)

    def execute(runnable: Runnable) {
        threadPool.submit(runnable)
    }

    def reportFailure(t: Throwable) {}
}
Josh Gao
fuente
73
Excelente respuesta, puede reducir un poco el texto estándar utilizando los métodos auxiliares en ExecutionContext que le permiten crear una instancia directamente desde un Ejecutor determinado. Por ejemplo, valor implícito ec = ExecutionContext.fromExecutor (Executors.newFixedThreadPool (10))
sksamuel
1
De acuerdo, todo esto está bien, pero ¿hay un límite real en los hilos de implicits.global? Si es así, ¿se puede configurar como akka a través de application.conf?
Nick
6
@Nick sí, implicits.global es solo 1 hilo por núcleo de CPU. Óptimo para tareas vinculadas a cpu. Pero para IO de bloqueo clásico (por ejemplo, jdbc) es un desastre de rendimiento.
Ben Hutchison
2
Necesitaba agregar una llamada para cerrar el grupo de subprocesos después de usar esto, o el programa nunca termina ... def shutdown () = threadPool.shutdown ()
justinhj
2
¿Por qué se apaga en el caso "normal", pero no cuando establecemos el implícito en otra cosa?
hbogert
152

Esta respuesta es de monkjack, un comentario de la respuesta aceptada. Sin embargo, uno puede perderse esta gran respuesta, así que la estoy volviendo a publicar aquí.

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

Si solo necesita cambiar el recuento del grupo de subprocesos, simplemente use el ejecutor global y pase las siguientes propiedades del sistema.

-Dscala.concurrent.context.numThreads=8 -Dscala.concurrent.context.maxThreads=8
Bienvenido David
fuente
2
esta es una solución más elegante
Ben Hutchison
Probé ambos con el valor de 5, y todavía veo hasta 8 subprocesos ejecutándose al mismo tiempo.
micseydel
3

la mejor manera de especificar la agrupación de subprocesos en futuros de scala:

implicit val ec = new ExecutionContext {
      val threadPool = Executors.newFixedThreadPool(conf.getInt("5"));
      override def reportFailure(cause: Throwable): Unit = {};
      override def execute(runnable: Runnable): Unit = threadPool.submit(runnable);
      def shutdown() = threadPool.shutdown();
    }
S'chn T'gai Spock
fuente
0
class ThreadPoolExecutionContext(val executionContext: ExecutionContext)

object ThreadPoolExecutionContext {

  val executionContextProvider: ThreadPoolExecutionContext = {
    try {
      val executionContextExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(25))
      new ThreadPoolExecutionContext(executionContextExecutor)
    } catch {
      case exception: Exception => {
        Log.error("Failed to create thread pool", exception)
        throw exception
      }
    }
  }
}
VAIBHAV GOUR
fuente