Casos de uso para planificadores RxJava

253

En RxJava hay 5 programadores diferentes para elegir:

  1. inmediata () : crea y devuelve un Programador que ejecuta el trabajo inmediatamente en el hilo actual.

  2. trampoline () : crea y devuelve un Programador que pone en cola el trabajo en el hilo actual para que se ejecute después de que se complete el trabajo actual.

  3. newThread () : crea y devuelve un programador que crea un nuevo subproceso para cada unidad de trabajo.

  4. computation () : crea y devuelve un programador destinado al trabajo computacional. Esto se puede usar para bucles de eventos, procesamiento de devoluciones de llamada y otros trabajos computacionales. No realice trabajos vinculados a IO en este planificador. Usar programadores. io () en su lugar.

  5. io () : crea y devuelve un planificador destinado a trabajos vinculados a IO. La implementación está respaldada por un grupo de subprocesos Ejecutor que crecerá según sea necesario. Esto se puede usar para realizar asincrónicamente el bloqueo de E / S. No realice trabajos computacionales en este planificador. Usar programadores. computation () en su lugar.

Preguntas:

Los primeros 3 programadores se explican por sí mismos; Sin embargo, estoy un poco confundido acerca de la computación y io .

  1. ¿Qué es exactamente el "trabajo vinculado a IO"? ¿Se utiliza para tratar streams ( java.io) y files ( java.nio.files)? ¿Se utiliza para consultas de bases de datos? ¿Se utiliza para descargar archivos o acceder a las API REST?
  2. ¿En qué se diferencia computation () de newThread () ? ¿Es que todas las llamadas de cómputo () están en un solo hilo (fondo) en lugar de un nuevo hilo (fondo) cada vez?
  3. ¿Por qué es malo llamar a computation () cuando hago trabajo IO?
  4. ¿Por qué es malo llamar a io () cuando se realiza un trabajo computacional?
bcorso
fuente

Respuestas:

332

Grandes preguntas, creo que la documentación podría ser más detallada.

  1. io()está respaldado por un grupo de subprocesos ilimitado y es el tipo de cosa que usaría para tareas no computacionalmente intensivas, es decir, cosas que no ponen mucha carga en la CPU. Entonces, la interacción con el sistema de archivos, la interacción con bases de datos o servicios en un host diferente son buenos ejemplos.
  2. computation()está respaldado por un grupo de subprocesos limitado con un tamaño igual al número de procesadores disponibles. Si trató de programar el trabajo intensivo de la CPU en paralelo en más de los procesadores disponibles (por ejemplo, usando newThread()), entonces está preparado para la sobrecarga de creación de subprocesos y la sobrecarga de cambio de contexto a medida que los subprocesos compiten por un procesador y es potencialmente un gran impacto en el rendimiento.
  3. Es mejor dejar computation()el trabajo intensivo de la CPU solo, de lo contrario no obtendrá una buena utilización de la CPU.
  4. Es malo pedir io()trabajo computacional por la razón que se analiza en 2. io()no tiene límites y si programa mil tareas computacionales io()en paralelo, cada una de esas mil tareas tendrá su propio hilo y competirá por la CPU incurriendo en costos de cambio de contexto.
Dave Moten
fuente
55
A través de la familiaridad con la fuente RxJava. Fue una fuente de confusión para mí durante mucho tiempo y creo que la documentación necesita reforzarse a este respecto.
Dave Moten
2
@IgorGanapolsky Supongo que es algo que rara vez querrías hacer. La creación de un nuevo hilo para cada unidad de trabajo rara vez conduce a la eficiencia, ya que los hilos son caros de construir y derribar. Por lo general, desea reutilizar hilos que computation () y otros planificadores hacen. La única vez que newThread () podría tener un uso legítimo (al menos se me ocurre) es iniciar tareas aisladas, infrecuentes y de larga duración. Incluso entonces podría usar io () para ese escenario.
tmn
44
¿Podría mostrar un ejemplo donde el trampolín () sería útil? Entiendo el concepto, pero no puedo entender un escenario que lo usaría en la práctica. Es el único programador que todavía me
resulta
32
Para llamadas de red, use Schedulers.io () y si necesita limitar el número de llamadas simultáneas de red, use Scheduler.from (Executors.newFixedThreadPool (n)).
Dave Moten el
44
Puede pensar que ponerlo timeoutde forma predeterminada computation()sería bloquear un hilo, pero no es el caso. Debajo de las cubiertas se computation()utiliza un ScheduledExecutorServicetiempo para que las acciones retrasadas no se bloqueen. Dado este hecho, computation()es una buena idea porque si estuviera en otro hilo, estaríamos sujetos a costos de cambio de hilo.
Dave Moten
3

El punto más importante es que tanto la programación Schedulers.io como Schedulers.com están respaldadas por grupos de subprocesos ilimitados en lugar de los otros mencionados en la pregunta. Esta característica solo es compartida por Schedulers.from (Ejecutor) en el caso de que el Ejecutor se cree con newCachedThreadPool (sin límites con un grupo de subprocesos de recuperación automática).

Como se ha explicado abundantemente en las respuestas anteriores y en varios artículos en la web, la publicación Schedulers.io y Schedulers.com se utilizará con cuidado, ya que están optimizados para el tipo de trabajo en su nombre. Pero, desde mi punto de vista, su función más importante es proporcionar concurrencia real a las corrientes reactivas .

Contrariamente a la creencia de los recién llegados, las corrientes reactivas no son inherentemente concurrentes sino inherentemente asíncronas y secuenciales. Por esta misma razón, Schedulers.io se usará solo cuando la operación de E / S esté bloqueando (por ejemplo: usando un comando de bloqueo como Apache IOUtils FileUtils.readFileAsString (...) ) así congelaría el hilo de llamada hasta que la operación esté hecho.

El uso de un método asincrónico como Java AsynchronousFileChannel (...) no bloquearía el hilo de llamada durante la operación, por lo que no tiene sentido usar un hilo separado. De hecho, los subprocesos Schedulers.io no son realmente adecuados para operaciones asincrónicas, ya que no ejecutan un bucle de eventos y la devolución de llamada nunca ... se llamará.

La misma lógica se aplica para el acceso a la base de datos o las llamadas remotas a la API. No use Schedulers.io si puede usar una API asíncrona o reactiva para realizar la llamada.

De vuelta a la concurrencia. Es posible que no tenga acceso a una API asíncrona o reactiva para realizar operaciones de E / S de forma asincrónica o concurrente, por lo que su única alternativa es enviar múltiples llamadas en un hilo separado. Por desgracia, las secuencias reactivas son secuenciales en sus extremos, pero la buena noticia es que el operador flatMap () puede introducir la concurrencia en su núcleo .

La simultaneidad debe construirse en la construcción de flujo, generalmente utilizando el operador flatMap () . Este potente operador se puede configurar para proporcionar internamente un contexto de subprocesos múltiples a su función incorporada flatMap () <T, R>. Ese contexto lo proporciona un programador de subprocesos múltiples, como Scheduler.io o Scheduler.computation .

Encuentre más detalles en artículos sobre RxJava2 Schedulers and Concurrency donde encontrará ejemplos de código y explicaciones detalladas sobre cómo usar Schedulers de forma secuencial y simultánea.

Espero que esto ayude,

Softjake

softjake
fuente
2

Esta publicación de blog ofrece una excelente respuesta

De la publicación del blog:

Schedulers.io () está respaldado por un grupo de subprocesos ilimitado. Se utiliza para trabajos de tipo de E / S que no requieren mucha CPU, incluida la interacción con el sistema de archivos, la realización de llamadas de red, interacciones de bases de datos, etc.

Schedulers.computation () está respaldado por un grupo de subprocesos limitado con un tamaño de hasta el número de procesadores disponibles. Se usa para el trabajo computacional o intensivo de la CPU, como cambiar el tamaño de las imágenes, procesar grandes conjuntos de datos, etc. Tenga cuidado: cuando asigna más subprocesos de cómputo que los núcleos disponibles, el rendimiento se degradará debido al cambio de contexto y la sobrecarga de la creación de subprocesos a medida que los subprocesos compiten por tiempo de procesadores.

Schedulers.newThread () crea un nuevo hilo para cada unidad de trabajo programada. Este programador es costoso ya que cada vez se genera un nuevo subproceso y no se reutiliza.

Schedulers.from (ejecutor ejecutor) crea y devuelve un programador personalizado respaldado por el ejecutor especificado. Para limitar el número de subprocesos simultáneos en el grupo de subprocesos, use Scheduler.from (Executors.newFixedThreadPool (n)). Esto garantiza que si una tarea se programa cuando todos los hilos están ocupados, se pondrá en cola. Los subprocesos en el grupo existirán hasta que se cierre explícitamente.

El subproceso principal o AndroidSchedulers.mainThread () es proporcionado por la biblioteca de extensiones RxAndroid para RxJava. El subproceso principal (también conocido como subproceso de interfaz de usuario) es donde ocurre la interacción del usuario. Se debe tener cuidado de no sobrecargar este hilo para evitar una interfaz de usuario que no responda de manera irregular o, peor aún, el cuadro de diálogo Aplicación que no responde (ANR).

Schedulers.single () es nuevo en RxJava 2. Este planificador está respaldado por un solo hilo que ejecuta tareas secuencialmente en el orden solicitado.

Schedulers.trampoline () ejecuta tareas de manera FIFO (Primero en entrar, Primero en salir) por uno de los hilos de trabajo participantes. A menudo se usa al implementar la recursión para evitar aumentar la pila de llamadas.

joe
fuente