Tengo una clase que toma objetos de a BlockingQueue
y los procesa llamando take()
en un bucle continuo. En algún momento sé que no se agregarán más objetos a la cola. ¿Cómo interrumpo el take()
método para que deje de bloquear?
Aquí está la clase que procesa los objetos:
public class MyObjHandler implements Runnable {
private final BlockingQueue<MyObj> queue;
public class MyObjHandler(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
MyObj obj = queue.take();
// process obj here
// ...
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Y aquí está el método que usa esta clase para procesar objetos:
public void testHandler() {
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
// what code should go here to tell the handler
// to stop waiting for more objects?
}
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjectHandler handler = new MyObjectHandler(queue); Thread thread = new Thread(handler); thread.start(); for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } thread.interrupt();
Sin embargo, si hace esto, es posible que el hilo se interrumpa mientras todavía hay elementos en la cola esperando ser procesados. Es posible que desee considerar usar en
poll
lugar detake
, lo que permitirá que el hilo de procesamiento se agote y finalice cuando haya esperado un tiempo sin una nueva entrada.fuente
Thread.sleep()
como alternativa a un gancho adecuado. En otras implementaciones, otros subprocesos pueden colocar cosas en la cola y es posible que el ciclo while nunca termine.take()
implementación podría verse así:try { return take(); } catch (InterruptedException e) { E o = poll(); if (o == null) throw e; Thread.currentThread().interrupt(); return o; }
Sin embargo, no hay ninguna razón por la que deba implementarse en esta capa, e implementarla un poco más arriba conducirá a un código más eficiente (por ejemplo, al evitar el elemento por elementoInterruptedException
y / o usandoBlockingQueue.drainTo()
).Muy tarde, pero espero que esto ayude a otros también, ya que enfrenté un problema similar y utilicé el
poll
enfoque sugerido por erickson anteriormente con algunos cambios menores.class MyObjHandler implements Runnable { private final BlockingQueue<MyObj> queue; public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL public MyObjHandler(BlockingQueue queue) { this.queue = queue; Finished = false; } @Override public void run() { while (true) { try { MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS); if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return { // process obj here // ... } if(Finished && queue.isEmpty()) return; } catch (InterruptedException e) { return; } } } } public void testHandler() { BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjHandler handler = new MyObjHandler(queue); new Thread(handler).start(); // get objects for handler to process for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } // what code should go here to tell the handler to stop waiting for more objects? handler.Finished = true; //THIS TELLS HIM //If you need you can wait for the termination otherwise remove join myThread.join(); }
Esto resolvió ambos problemas
BlockingQueue
para que sepa que no tiene que esperar más elementosfuente
Finished
variablevolatile
para garantizar la visibilidad entre hilos. Ver stackoverflow.com/a/106787Interrumpe el hilo:
fuente
O no interrumpas, es desagradable.
public class MyQueue<T> extends ArrayBlockingQueue<T> { private static final long serialVersionUID = 1L; private boolean done = false; public ParserQueue(int capacity) { super(capacity); } public void done() { done = true; } public boolean isDone() { return done; } /** * May return null if producer ends the production after consumer * has entered the element-await state. */ public T take() throws InterruptedException { T el; while ((el = super.poll()) == null && !done) { synchronized (this) { wait(); } } return el; } }
queue.notify()
, si termina, llamaqueue.done()
fuente
Que tal un
queue.add(new MyObj())
en algún hilo de productor, donde una bandera de parada le indica al hilo del consumidor que finalice el ciclo while?
fuente