¿Cómo interrumpir un BlockingQueue que está bloqueando en take ()?

82

Tengo una clase que toma objetos de a BlockingQueuey los procesa llamando take()en un bucle continuo. En algún momento sé que no se agregarán más objetos a la cola. ¿Cómo interrumpo el take()método para que deje de bloquear?

Aquí está la clase que procesa los objetos:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

Y aquí está el método que usa esta clase para procesar objetos:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}
MCS
fuente

Respuestas:

72

Si interrumpir el hilo no es una opción, otra es colocar un objeto "marcador" o "comando" en la cola que MyObjHandler reconocería como tal y saldrá del bucle.

Chris Thornhill
fuente
39
Esto también se conoce como el enfoque 'Apagado de la píldora venenosa' y se analiza en profundidad en "Concurrencia de Java en la práctica", específicamente en las páginas 155-156.
Brandon Yarbrough
14
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

Sin embargo, si hace esto, es posible que el hilo se interrumpa mientras todavía hay elementos en la cola esperando ser procesados. Es posible que desee considerar usar en polllugar de take, lo que permitirá que el hilo de procesamiento se agote y finalice cuando haya esperado un tiempo sin una nueva entrada.

erickson
fuente
Sí, es un problema si el hilo se interrumpe mientras todavía hay elementos en la cola. Para solucionar esto, agregué código para asegurarme de que la cola esté vacía antes de interrumpir el hilo: <code> while (queue.size ()> 0) Thread.currentThread (). Sleep (5000); </code>
MCS
3
@MCS: tenga en cuenta para los futuros visitantes que su enfoque aquí es un truco y no debe reproducirse en el código de producción por tres razones. Siempre es preferible encontrar una forma real de conectar el apagado. Nunca es aceptable usarlo Thread.sleep()como alternativa a un gancho adecuado. En otras implementaciones, otros subprocesos pueden colocar cosas en la cola y es posible que el ciclo while nunca termine.
Erick Robertson
Afortunadamente, no hay necesidad de depender de tales hacks porque uno puede procesarlos con la misma facilidad después de la interrupción si es necesario. Por ejemplo, un "exhaustivo"take() implementación podría verse así: try { return take(); } catch (InterruptedException e) { E o = poll(); if (o == null) throw e; Thread.currentThread().interrupt(); return o; } Sin embargo, no hay ninguna razón por la que deba implementarse en esta capa, e implementarla un poco más arriba conducirá a un código más eficiente (por ejemplo, al evitar el elemento por elemento InterruptedExceptiony / o usando BlockingQueue.drainTo()).
antak
13

Muy tarde, pero espero que esto ayude a otros también, ya que enfrenté un problema similar y utilicé el pollenfoque sugerido por erickson anteriormente con algunos cambios menores.

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                {
                    // process obj here
                    // ...
                }
                if(Finished && queue.isEmpty())
                    return;

            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}

Esto resolvió ambos problemas

  1. Marcado el BlockingQueuepara que sepa que no tiene que esperar más elementos
  2. No se interrumpió en el medio para que los bloques de procesamiento terminen solo cuando todos los elementos en la cola se procesan y no quedan elementos por agregar
dbw
fuente
2
Hacer Finishedvariable volatilepara garantizar la visibilidad entre hilos. Ver stackoverflow.com/a/106787
lukk
2
Si no me equivoco, el elemento final de la cola no se procesará. Cuando tomas el elemento final de la cola, terminado es verdadero y la cola está vacía, por lo que regresará antes de haber manejado ese elemento final. Agregue una tercera condición si (Finalizado && queue.isEmpty () && obj == null)
Matt R
@MattR gracias, correctamente informado, editaré la respuesta publicada
dbw
1

Interrumpe el hilo:

thread.interrupt()
Stepancheg
fuente
0

O no interrumpas, es desagradable.

    public class MyQueue<T> extends ArrayBlockingQueue<T> {

        private static final long serialVersionUID = 1L;
        private boolean done = false;

        public ParserQueue(int capacity) {  super(capacity); }

        public void done() { done = true; }

        public boolean isDone() { return done; }

        /**
         * May return null if producer ends the production after consumer 
         * has entered the element-await state.
         */
        public T take() throws InterruptedException {
            T el;
            while ((el = super.poll()) == null && !done) {
                synchronized (this) {
                    wait();
                }
            }

            return el;
        }
    }
  1. cuando el productor pone un objeto en la cola, llama queue.notify(), si termina, llamaqueue.done()
  2. bucle mientras (! queue.isDone () ||! queue.isEmpty ())
  3. test take () valor de retorno para nulo
tomasb
fuente
1
Yo diría que la solución anterior era más limpia y más simple que esta
sakthisundar
1
La bandera de listo es lo mismo que una píldora de veneno, se administra de manera diferente :)
David Mann
¿Limpiador? Lo dudo. No sabes exactamente cuándo se interrumpe el hilo. O déjame preguntarte, ¿qué es más limpio con eso? Es menos código, eso es un hecho.
tomasb
0

Que tal un

queue.add(new MyObj())

en algún hilo de productor, donde una bandera de parada le indica al hilo del consumidor que finalice el ciclo while?

Sam Ginrich
fuente
1
Antes de responder una pregunta anterior que tenga una respuesta aceptada (busque la ✓ verde), así como otras respuestas, asegúrese de que su respuesta agregue algo nuevo o sea útil en relación con ellas. Como tal, la respuesta a su pregunta viene dada por la respuesta aceptada para la pregunta de OP. - El formato Q / A de Stack Overflow no está diseñado para discusiones al estilo de un foro. No dé una respuesta preguntando "¿qué tal si hacemos X?" ya que no responde a la pregunta de OP, sino que es un comentario. Consulte las Pautas para colaboradores .
Ivo Mori
... visto, mi mensaje puede ser eliminado
Sam Ginrich