¿Es posible leer desde un InputStream con un tiempo de espera?

147

Específicamente, el problema es escribir un método como este:

int maybeRead(InputStream in, long timeout)

donde el valor de retorno es el mismo que in.read () si los datos están disponibles dentro de 'tiempo de espera' milisegundos, y -2 en caso contrario. Antes de que el método regrese, cualquier subproceso generado debe salir.

Para evitar discusiones, el tema aquí es java.io.InputStream, según lo documenta Sun (cualquier versión de Java). Tenga en cuenta que esto no es tan simple como parece. A continuación se presentan algunos hechos que están respaldados directamente por la documentación de Sun.

  1. El método in.read () puede no ser interrumpible.

  2. Ajustar el InputStream en un Reader o InterruptibleChannel no ayuda, porque todas esas clases pueden hacer es llamar a los métodos del InputStream. Si fuera posible usar esas clases, sería posible escribir una solución que simplemente ejecute la misma lógica directamente en InputStream.

  3. Siempre es aceptable que in.available () devuelva 0.

  4. El método in.close () puede bloquear o no hacer nada.

  5. No hay una forma general de matar otro hilo.

gris
fuente

Respuestas:

83

Usando inputStream.available ()

Siempre es aceptable que System.in.available () devuelva 0.

He encontrado lo contrario: siempre devuelve el mejor valor para la cantidad de bytes disponibles. Javadoc para InputStream.available():

Returns an estimate of the number of bytes that can be read (or skipped over) 
from this input stream without blocking by the next invocation of a method for 
this input stream.

Una estimación es inevitable debido al tiempo / estancamiento. La cifra puede ser una subestimación única porque constantemente llegan nuevos datos. Sin embargo, siempre "se pone al día" en la próxima llamada: debe tener en cuenta todos los datos recibidos, salvo el que llega justo en el momento de la nueva llamada. Devolver permanentemente 0 cuando hay datos falla la condición anterior.

Primera advertencia: las subclases concretas de InputStream son responsables de las disponibles ()

InputStreamEs una clase abstracta. No tiene fuente de datos. No tiene sentido tener datos disponibles. Por lo tanto, javadoc para available()también afirma:

The available method for class InputStream always returns 0.

This method should be overridden by subclasses.

Y, de hecho, las clases concretas de flujo de entrada anulan disponible (), proporcionando valores significativos, no ceros constantes.

Segunda advertencia: asegúrese de utilizar el retorno de carro al escribir la entrada en Windows.

Si lo usa System.in, su programa solo recibe información cuando su shell de comando lo entrega. Si está utilizando redirección / canalización de archivos (por ejemplo, algún archivo> java myJavaApp o somecommand | java myJavaApp), los datos de entrada generalmente se entregan de inmediato. Sin embargo, si escribe manualmente la entrada, la transferencia de datos puede retrasarse. Por ejemplo, con el shell cmd.exe de Windows, los datos se almacenan en el shell cmd.exe. Los datos solo se pasan al programa Java en ejecución después del retorno de carro (control-mo <enter>). Esa es una limitación del entorno de ejecución. Por supuesto, InputStream.available () devolverá 0 durante el tiempo que el shell guarde los datos, es el comportamiento correcto; No hay datos disponibles en ese momento. Tan pronto como los datos estén disponibles desde el shell, el método devuelve un valor> 0. NB: Cygwin usa cmd.

La solución más simple (sin bloqueo, por lo que no se requiere tiempo de espera)

Solo usa esto:

    byte[] inputData = new byte[1024];
    int result = is.read(inputData, 0, is.available());  
    // result will indicate number of bytes read; -1 for EOF with no data read.

O equivalente,

    BufferedReader br = new BufferedReader(new InputStreamReader(System.in, Charset.forName("ISO-8859-1")),1024);
    // ...
         // inside some iteration / processing logic:
         if (br.ready()) {
             int readCount = br.read(inputData, bufferOffset, inputData.length-bufferOffset);
         }

Solución más rica (llena al máximo el búfer dentro del tiempo de espera)

Declara esto:

public static int readInputStreamWithTimeout(InputStream is, byte[] b, int timeoutMillis)
     throws IOException  {
     int bufferOffset = 0;
     long maxTimeMillis = System.currentTimeMillis() + timeoutMillis;
     while (System.currentTimeMillis() < maxTimeMillis && bufferOffset < b.length) {
         int readLength = java.lang.Math.min(is.available(),b.length-bufferOffset);
         // can alternatively use bufferedReader, guarded by isReady():
         int readResult = is.read(b, bufferOffset, readLength);
         if (readResult == -1) break;
         bufferOffset += readResult;
     }
     return bufferOffset;
 }

Entonces usa esto:

    byte[] inputData = new byte[1024];
    int readCount = readInputStreamWithTimeout(System.in, inputData, 6000);  // 6 second timeout
    // readCount will indicate number of bytes read; -1 for EOF with no data read.
Glen Best
fuente
1
Si is.available() > 1024esta sugerencia fallará. Ciertamente, hay flujos que devuelven cero. SSLSockets, por ejemplo, hasta hace poco. No puedes confiar en esto.
Marqués de Lorne
El caso 'is.available ()> 1024' se trata específicamente a través de readLength.
Glen Best
Comentario re SSLSockets incorrecto: devuelve 0 para los disponibles si no hay datos en el búfer. Según mi respuesta. Javadoc: "Si no hay bytes almacenados en el zócalo y el zócalo no se ha cerrado usando cerrar, entonces disponible devolverá 0."
Glen Best
@GlenBest Mi comentario sobre SSLSocket no es incorrecto. Hasta hace poco [mi énfasis] solía devolver cero en todo momento. Estás hablando del presente. Estoy hablando de toda la historia de JSSE, y he trabajado desde antes de que se incluyera por primera vez en Java 1.4 en 2002 .
Marqués de Lorne
Al cambiar las condiciones del bucle while a "while (is.available ()> 0 && System.currentTimeMillis () <maxTimeMillis && bufferOffset <b.length) {" me ahorró una tonelada de sobrecarga de la CPU.
Lógica1
65

Suponiendo que su transmisión no esté respaldada por un socket (por lo que no puede usarla Socket.setSoTimeout()), creo que la forma estándar de resolver este tipo de problema es usar un Futuro.

Supongamos que tengo el siguiente ejecutor y transmisiones:

    ExecutorService executor = Executors.newFixedThreadPool(2);
    final PipedOutputStream outputStream = new PipedOutputStream();
    final PipedInputStream inputStream = new PipedInputStream(outputStream);

Tengo un escritor que escribe algunos datos y luego espera 5 segundos antes de escribir el último dato y cerrar la transmisión:

    Runnable writeTask = new Runnable() {
        @Override
        public void run() {
            try {
                outputStream.write(1);
                outputStream.write(2);
                Thread.sleep(5000);
                outputStream.write(3);
                outputStream.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };
    executor.submit(writeTask);

La forma normal de leer esto es la siguiente. La lectura se bloqueará indefinidamente para los datos y esto se completa en 5s:

    long start = currentTimeMillis();
    int readByte = 1;
    // Read data without timeout
    while (readByte >= 0) {
        readByte = inputStream.read();
        if (readByte >= 0)
            System.out.println("Read: " + readByte);
    }
    System.out.println("Complete in " + (currentTimeMillis() - start) + "ms");

que salidas:

Read: 1
Read: 2
Read: 3
Complete in 5001ms

Si hubiera un problema más fundamental, como si el escritor no respondiera, el lector bloquearía para siempre. Si finalizo la lectura en un futuro, puedo controlar el tiempo de espera de la siguiente manera:

    int readByte = 1;
    // Read data with timeout
    Callable<Integer> readTask = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            return inputStream.read();
        }
    };
    while (readByte >= 0) {
        Future<Integer> future = executor.submit(readTask);
        readByte = future.get(1000, TimeUnit.MILLISECONDS);
        if (readByte >= 0)
            System.out.println("Read: " + readByte);
    }

que salidas:

Read: 1
Read: 2
Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
    at java.util.concurrent.FutureTask.get(FutureTask.java:91)
    at test.InputStreamWithTimeoutTest.main(InputStreamWithTimeoutTest.java:74)

Puedo atrapar la TimeoutException y hacer la limpieza que quiera.

Ian Jones
fuente
14
¿Pero qué pasa con el hilo de bloqueo? ¿Permanecerá en la memoria hasta que finalice la aplicación? Si estoy en lo cierto, esto puede producir subprocesos interminables, la aplicación está cargada y aún más, bloquea el uso de otros subprocesos de su grupo que tiene sus subprocesos ocupados y bloqueados. Por favor corrígeme si estoy equivocado. Gracias.
Muhammad Gelbana
44
Muhammad Gelbana, tiene razón: el hilo de lectura de lectura () permanece en ejecución y eso no está bien. Sin embargo, he encontrado una manera de evitar esto: cuando llegue el tiempo de espera, cierre desde el hilo de llamada el flujo de entrada (en mi caso, cierro el zócalo bluetooth de Android del que proviene el flujo de entrada). Cuando haces eso, la llamada read () regresará inmediatamente. Bueno, en mi caso uso la sobrecarga int read (byte []), y esa regresa inmediatamente. Tal vez la sobrecarga int read () arrojaría una IOException ya que no sé qué devolvería ... En mi opinión, esa es la solución adecuada.
Emmanuel Touzery
55
-1 ya que la lectura de subprocesos permanece bloqueada hasta que finaliza la aplicación.
Ortwin Angermeier
11
@ortang Eso es lo que quise decir con "capturar la TimeoutException y hacer cualquier limpieza ..." Por ejemplo, podría querer matar el hilo de lectura: ... catch (TimeoutException e) {executeor.shutdownNow (); }
Ian Jones
12
executer.shutdownNowNo matará el hilo. Intentará interrumpirlo, sin efecto. No hay limpieza posible y este es un problema grave.
Marko Topolnik
22

Si su InputStream está respaldado por un Socket, puede establecer un tiempo de espera de Socket (en milisegundos) usando setSoTimeout . Si la llamada read () no se desbloquea dentro del tiempo de espera especificado, arrojará una SocketTimeoutException.

Solo asegúrese de llamar a setSoTimeout en el Socket antes de hacer la llamada read ().

Templarios
fuente
18

Yo cuestionaría el enunciado del problema en lugar de simplemente aceptarlo ciegamente. Solo necesita tiempos de espera de la consola o de la red. Si tiene este último Socket.setSoTimeout()y HttpURLConnection.setReadTimeout()ambos hacen exactamente lo que se requiere, siempre que los configure correctamente cuando los construya / adquiera. Dejarlo en un punto arbitrario más adelante en la aplicación cuando todo lo que tiene es que InputStream es un diseño deficiente que conduce a una implementación muy incómoda.

Marqués de Lorne
fuente
10
Hay otras situaciones en las que una lectura podría bloquearse potencialmente durante un tiempo significativo; por ejemplo, al leer desde una unidad de cinta, desde una unidad de red montada de forma remota o desde un HFS con un robot de cinta en el extremo posterior. (Pero el objetivo principal de su respuesta es correcto.)
Stephen C
1
@StephenC +1 por tu comentario y ejemplos. Para agregar más su ejemplo, un caso simple podría ser donde las conexiones de socket se realizaron correctamente pero el intento de lectura se bloqueó ya que los datos se iban a buscar de DB pero de alguna manera no sucedió (digamos que DB no estaba respondiendo y la consulta se fue en estado bloqueado). En este escenario, debe tener una manera de expirar explícitamente la operación de lectura en el socket.
sábado
1
El objetivo de la abstracción InputStream es no pensar en la implementación subyacente. Es justo discutir sobre los pros y los contras de las respuestas publicadas. Sin embargo, a cuestionar el planteamiento del problema, no va a ayudar al disussion
pellucide
2
InputStream funciona en una secuencia y bloquea, sin embargo, no proporciona un mecanismo de tiempo de espera. Por lo tanto, la abstracción InputStream no es una abstracción diseñada adecuadamente. Por lo tanto, pedir una forma de tiempo de espera en una transmisión no es pedir mucho. Entonces, la pregunta es pedir una solución a un problema muy práctico. La mayoría de las implementaciones subyacentes se bloquearán. Esa es la esencia misma de una corriente. Los sockets, los archivos y las tuberías se bloquearán si el otro lado de la transmisión no está listo con datos nuevos.
Pellucidio
2
@EJP. No sé cómo conseguiste eso. No estuve de acuerdo contigo. La declaración del problema "cómo agotar el tiempo de espera en un InputStream" es válida. Dado que el marco no proporciona una forma de tiempo de espera, es apropiado hacer esa pregunta.
pellucidio
7

No he usado las clases del paquete Java NIO, pero parece que pueden ser de alguna ayuda aquí. Específicamente, java.nio.channels.Channels y java.nio.channels.InterruptibleChannel .

jt.
fuente
2
+1: No creo que haya una manera confiable de hacer lo que el OP está pidiendo solo con InputStream. Sin embargo, nio fue creado para este propósito, entre otros.
Eddie el
2
OP ya ha descartado básicamente esto. InputStreams son inherentemente bloqueantes y pueden no ser interrumpibles.
Marqués de Lorne
5

Aquí hay una manera de obtener un NIO FileChannel de System.in y verificar la disponibilidad de datos utilizando un tiempo de espera, que es un caso especial del problema descrito en la pregunta. Ejecútelo en la consola, no escriba ninguna entrada y espere los resultados. Fue probado con éxito bajo Java 6 en Windows y Linux.

import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;

public class Main {

    static final ByteBuffer buf = ByteBuffer.allocate(4096);

    public static void main(String[] args) {

        long timeout = 1000 * 5;

        try {
            InputStream in = extract(System.in);
            if (! (in instanceof FileInputStream))
                throw new RuntimeException(
                        "Could not extract a FileInputStream from STDIN.");

            try {
                int ret = maybeAvailable((FileInputStream)in, timeout);
                System.out.println(
                        Integer.toString(ret) + " bytes were read.");

            } finally {
                in.close();
            }

        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }

    /* unravels all layers of FilterInputStream wrappers to get to the
     * core InputStream
     */
    public static InputStream extract(InputStream in)
            throws NoSuchFieldException, IllegalAccessException {

        Field f = FilterInputStream.class.getDeclaredField("in");
        f.setAccessible(true);

        while( in instanceof FilterInputStream )
            in = (InputStream)f.get((FilterInputStream)in);

        return in;
    }

    /* Returns the number of bytes which could be read from the stream,
     * timing out after the specified number of milliseconds.
     * Returns 0 on timeout (because no bytes could be read)
     * and -1 for end of stream.
     */
    public static int maybeAvailable(final FileInputStream in, long timeout)
            throws IOException, InterruptedException {

        final int[] dataReady = {0};
        final IOException[] maybeException = {null};
        final Thread reader = new Thread() {
            public void run() {                
                try {
                    dataReady[0] = in.getChannel().read(buf);
                } catch (ClosedByInterruptException e) {
                    System.err.println("Reader interrupted.");
                } catch (IOException e) {
                    maybeException[0] = e;
                }
            }
        };

        Thread interruptor = new Thread() {
            public void run() {
                reader.interrupt();
            }
        };

        reader.start();
        for(;;) {

            reader.join(timeout);
            if (!reader.isAlive())
                break;

            interruptor.start();
            interruptor.join(1000);
            reader.join(1000);
            if (!reader.isAlive())
                break;

            System.err.println("We're hung");
            System.exit(1);
        }

        if ( maybeException[0] != null )
            throw maybeException[0];

        return dataReady[0];
    }
}

Curiosamente, cuando se ejecuta el programa dentro de NetBeans 6.5 en lugar de en la consola, el tiempo de espera no funciona en absoluto, y la llamada a System.exit () es realmente necesaria para matar los hilos de zombies. Lo que sucede es que el hilo del interruptor bloquea (!) En la llamada a reader.interrupt (). Otro programa de prueba (que no se muestra aquí) también intenta cerrar el canal, pero tampoco funciona.


fuente
no funciona en mac os, ni con JDK 1.6 ni con JDK 1.7. La interrupción solo se reconoce después de presionar Retorno durante la lectura.
Mostowski Collapse
4

Como dijo jt, NIO es la mejor (y correcta) solución. Sin embargo, si realmente está atrapado con un InputStream, podría

  1. Genera un hilo cuyo trabajo exclusivo es leer el InputStream y poner el resultado en un búfer que se puede leer desde su hilo original sin bloquear. Esto debería funcionar bien si solo tiene una instancia de la transmisión. De lo contrario, es posible que pueda eliminar el hilo utilizando los métodos obsoletos en la clase Thread, aunque esto puede causar pérdidas de recursos.

  2. Confíe en isAvailable para indicar datos que se pueden leer sin bloquear. Sin embargo, en algunos casos (como con Sockets) puede tomar una lectura potencialmente bloqueante para isAvailable para informar algo distinto de 0.


fuente
55
Socket.setSoTimeout()es una solución igualmente correcta y mucho más simple. O HttpURLConnection.setReadTimeout().
Marqués de Lorne
3
@EJP: estos son solo "igualmente correctos" bajo ciertas circunstancias; por ejemplo, si la secuencia de entrada es una secuencia de socket / secuencia de conexión HTTP.
Stephen C
1
@Stephen C NIO solo no se bloquea y se puede seleccionar en las mismas circunstancias. No hay E / S de archivo sin bloqueo, por ejemplo.
Marqués de Lorne
2
@EJP pero hay E / S de tubería sin bloqueo (System.in), E / S sin bloqueo para archivos (en disco local) no tiene sentido
woky
1
@EJP En la mayoría (¿todos?) Unices System.in es en realidad una tubería (si no le dijiste a Shell que lo reemplace con el archivo) y, como tubería, puede no bloquearse.
woky
0

Inspirado en esta respuesta, se me ocurrió una solución un poco más orientada a objetos.

Esto solo es válido si tiene la intención de leer caracteres

Puede anular BufferedReader e implementar algo como esto:

public class SafeBufferedReader extends BufferedReader{

    private long millisTimeout;

    ( . . . )

    @Override
    public int read(char[] cbuf, int off, int len) throws IOException {
        try {
            waitReady();
        } catch(IllegalThreadStateException e) {
            return 0;
        }
        return super.read(cbuf, off, len);
    }

    protected void waitReady() throws IllegalThreadStateException, IOException {
        if(ready()) return;
        long timeout = System.currentTimeMillis() + millisTimeout;
        while(System.currentTimeMillis() < timeout) {
            if(ready()) return;
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                break; // Should restore flag
            }
        }
        if(ready()) return; // Just in case.
        throw new IllegalThreadStateException("Read timed out");
    }
}

Aquí hay un ejemplo casi completo.

Estoy devolviendo 0 en algunos métodos, debería cambiarlo a -2 para satisfacer sus necesidades, pero creo que 0 es más adecuado con el contrato BufferedReader. No pasó nada malo, solo leía 0 caracteres. El método readLine es un asesino de rendimiento horrible. Debe crear un BufferedReader completamente nuevo si realmente desea usar readLin e. En este momento, no es seguro para subprocesos. Si alguien invoca una operación mientras readLines está esperando una línea, producirá resultados inesperados.

No me gusta regresar -2 donde estoy. Lanzaría una excepción porque algunas personas pueden simplemente verificar si int <0 para considerar EOS. De todos modos, esos métodos afirman que "no se puede bloquear", debe verificar si esa declaración es realmente cierta y simplemente no anularlos.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.Reader;
import java.nio.CharBuffer;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

/**
 * 
 * readLine
 * 
 * @author Dario
 *
 */
public class SafeBufferedReader extends BufferedReader{

    private long millisTimeout;

    private long millisInterval = 100;

    private int lookAheadLine;

    public SafeBufferedReader(Reader in, int sz, long millisTimeout) {
        super(in, sz);
        this.millisTimeout = millisTimeout;
    }

    public SafeBufferedReader(Reader in, long millisTimeout) {
        super(in);
        this.millisTimeout = millisTimeout;
    }



    /**
     * This is probably going to kill readLine performance. You should study BufferedReader and completly override the method.
     * 
     * It should mark the position, then perform its normal operation in a nonblocking way, and if it reaches the timeout then reset position and throw IllegalThreadStateException
     * 
     */
    @Override
    public String readLine() throws IOException {
        try {
            waitReadyLine();
        } catch(IllegalThreadStateException e) {
            //return null; //Null usually means EOS here, so we can't.
            throw e;
        }
        return super.readLine();
    }

    @Override
    public int read() throws IOException {
        try {
            waitReady();
        } catch(IllegalThreadStateException e) {
            return -2; // I'd throw a runtime here, as some people may just be checking if int < 0 to consider EOS
        }
        return super.read();
    }

    @Override
    public int read(char[] cbuf) throws IOException {
        try {
            waitReady();
        } catch(IllegalThreadStateException e) {
            return -2;  // I'd throw a runtime here, as some people may just be checking if int < 0 to consider EOS
        }
        return super.read(cbuf);
    }

    @Override
    public int read(char[] cbuf, int off, int len) throws IOException {
        try {
            waitReady();
        } catch(IllegalThreadStateException e) {
            return 0;
        }
        return super.read(cbuf, off, len);
    }

    @Override
    public int read(CharBuffer target) throws IOException {
        try {
            waitReady();
        } catch(IllegalThreadStateException e) {
            return 0;
        }
        return super.read(target);
    }

    @Override
    public void mark(int readAheadLimit) throws IOException {
        super.mark(readAheadLimit);
    }

    @Override
    public Stream<String> lines() {
        return super.lines();
    }

    @Override
    public void reset() throws IOException {
        super.reset();
    }

    @Override
    public long skip(long n) throws IOException {
        return super.skip(n);
    }

    public long getMillisTimeout() {
        return millisTimeout;
    }

    public void setMillisTimeout(long millisTimeout) {
        this.millisTimeout = millisTimeout;
    }

    public void setTimeout(long timeout, TimeUnit unit) {
        this.millisTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
    }

    public long getMillisInterval() {
        return millisInterval;
    }

    public void setMillisInterval(long millisInterval) {
        this.millisInterval = millisInterval;
    }

    public void setInterval(long time, TimeUnit unit) {
        this.millisInterval = TimeUnit.MILLISECONDS.convert(time, unit);
    }

    /**
     * This is actually forcing us to read the buffer twice in order to determine a line is actually ready.
     * 
     * @throws IllegalThreadStateException
     * @throws IOException
     */
    protected void waitReadyLine() throws IllegalThreadStateException, IOException {
        long timeout = System.currentTimeMillis() + millisTimeout;
        waitReady();

        super.mark(lookAheadLine);
        try {
            while(System.currentTimeMillis() < timeout) {
                while(ready()) {
                    int charInt = super.read();
                    if(charInt==-1) return; // EOS reached
                    char character = (char) charInt;
                    if(character == '\n' || character == '\r' ) return;
                }
                try {
                    Thread.sleep(millisInterval);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // Restore flag
                    break;
                }
            }
        } finally {
            super.reset();
        }
        throw new IllegalThreadStateException("readLine timed out");

    }

    protected void waitReady() throws IllegalThreadStateException, IOException {
        if(ready()) return;
        long timeout = System.currentTimeMillis() + millisTimeout;
        while(System.currentTimeMillis() < timeout) {
            if(ready()) return;
            try {
                Thread.sleep(millisInterval);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // Restore flag
                break;
            }
        }
        if(ready()) return; // Just in case.
        throw new IllegalThreadStateException("read timed out");
    }

}
DGoiko
fuente