Lectura sin bloqueo en un subproceso.PIPE en python

507

Estoy usando el módulo de subproceso para iniciar un subproceso y conectarme a su flujo de salida (stdout). Quiero poder ejecutar lecturas sin bloqueo en su stdout. ¿Hay alguna manera de hacer que .readline no bloquee o verificar si hay datos en la transmisión antes de invocar .readline? Me gustaría que esto sea portátil o que al menos funcione en Windows y Linux.

así es como lo hago por ahora (está bloqueando .readlinesi no hay datos disponibles):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Mathieu Pagé
fuente
14
(¿Viene de google?) Todos los PIPE se estancarán cuando uno de los búferes de PIPE se llene y no se lea. por ejemplo, punto muerto stdout cuando se llena stderr. Nunca pase un TUBO que no tenga intención de leer.
Nasser Al-Wohaibi
@ NasserAl-Wohaibi, ¿significa esto que es mejor crear siempre archivos?
Charlie Parker
Algo que he tenido curiosidad por entender es por qué está bloqueando en primer lugar ... Pregunto porque he visto el comentario:To avoid deadlocks: careful to: add \n to output, flush output, use readline() rather than read()
Charlie Parker el
Es, "por diseño", esperando recibir entradas.
Mathieu Pagé
relacionado: stackoverflow.com/q/19880190/240515
user240515

Respuestas:

403

fcntl, select, asyncprocNo ayudará en este caso.

Una forma confiable de leer una secuencia sin bloquear independientemente del sistema operativo es usar Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line
jfs
fuente
66
Sí, esto funciona para mí, aunque eliminé mucho. Incluye buenas prácticas pero no siempre es necesario. Python 3.x 2.X compat y close_fds pueden omitirse, aún funcionará. ¡Pero tenga en cuenta lo que hace todo y no lo copie a ciegas, incluso si simplemente funciona! (En realidad, la solución más simple es usar un hilo y hacer una línea de lectura como lo hizo Seb, Qeues es solo una manera fácil de obtener los datos, hay otros, ¡los hilos son la respuesta!)
Aki,
3
Dentro del hilo, la llamada para out.readlinebloquear el hilo y el hilo principal, y tengo que esperar hasta que readline regrese antes de que todo lo demás continúe. ¿Alguna forma fácil de evitar eso? (Estoy leyendo varias líneas de mi proceso, que también es otro archivo .py que está haciendo DB y cosas)
Justin
3
@Justin: 'out.readline' no bloquea el hilo principal, se ejecuta en otro hilo.
jfs
44
¿Qué pasa si no puedo cerrar el subproceso, por ejemplo. debido a excepciones? el hilo del lector stdout no morirá y Python se colgará, incluso si el hilo principal salió, ¿no? ¿Cómo podría uno solucionar esto? python 2.x no admite matar los hilos, lo que es peor, no admite interrumpirlos :( (obviamente, uno debe manejar las excepciones para asegurar que el subproceso se cierre, pero en caso de que no lo haga, ¿qué puede hacer?)
n611x007
3
He creado algunos envoltorios amigables de esto en el paquete shelljob pypi.python.org/pypi/shelljob
edA-qa mort-ora-y
77

A menudo he tenido un problema similar; Los programas de Python que escribo con frecuencia necesitan tener la capacidad de ejecutar alguna funcionalidad primaria al mismo tiempo que aceptan la entrada del usuario desde la línea de comandos (stdin). Simplemente poner la funcionalidad de manejo de entrada del usuario en otro hilo no resuelve el problema porque readline()bloquea y no tiene tiempo de espera. Si la funcionalidad principal está completa y ya no hay necesidad de esperar más entradas del usuario, normalmente quiero que mi programa salga, pero no puede porque readline()todavía está bloqueando en el otro hilo esperando una línea. Una solución que he encontrado para este problema es hacer que stdin sea un archivo sin bloqueo usando el módulo fcntl:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

En mi opinión, esto es un poco más limpio que usar los módulos de selección o señal para resolver este problema, pero de nuevo solo funciona en UNIX ...

Jesse
fuente
1
Según los documentos, fcntl () puede recibir un descriptor de archivo o un objeto que tiene el método .fileno ().
Denilson Sá Maia
10
La respuesta de Jesse no es correcta. Según Guido, readline no funciona correctamente con el modo sin bloqueo, y no lo hará antes de Python 3000. bugs.python.org/issue1175#msg56041 Si desea usar fcntl para configurar el archivo en modo sin bloqueo, debe usar el os.read () de nivel inferior y separar las líneas usted mismo. Mezclar fcntl con llamadas de alto nivel que realizan almacenamiento en línea es pedir problemas.
anonnn
2
El uso de readline parece incorrecto en Python 2. Ver la respuesta de anonnn stackoverflow.com/questions/375427/…
Catalin Iacob
10
Por favor, no use bucles ocupados. Use poll () con un tiempo de espera para esperar los datos.
Ivo Danihelka
@Stefano ¿cómo se buffer_sizedefine?
gato
39

Python 3.4 presenta una nueva API provisional para el asynciomódulo de E / S asíncrono .

El enfoque es similar a la twistedrespuesta basada en @Bryan Ward : defina un protocolo y sus métodos se invocan tan pronto como los datos estén listos:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

Consulte "Subproceso" en los documentos .

Hay una interfaz de alto nivel asyncio.create_subprocess_exec()que devuelve Processobjetos que permiten leer una línea de forma asincrónica usando la StreamReader.readline()rutina (con async/ awaitPython 3.5+ sintaxis ):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() realiza las siguientes tareas:

  • iniciar subproceso, redirigir su stdout a una tubería
  • leer una línea del subproceso stdout de forma asincrónica
  • matar subproceso
  • espera a que salga

Cada paso podría estar limitado por segundos de tiempo de espera si es necesario.

jfs
fuente
Cuando intento algo como esto usando las corutinas de Python 3.4, solo obtengo resultados una vez que se ha ejecutado todo el script. Me gustaría ver una línea de salida impresa, tan pronto como el subproceso imprima una línea. Esto es lo que tengo: pastebin.com/qPssFGep .
flutefreak7
1
@ flutefreak7: los problemas de almacenamiento en búfer no están relacionados con la pregunta actual. Siga el enlace para posibles soluciones.
jfs
¡Gracias! Solucioné el problema de mi script simplemente usando print(text, flush=True)para que el texto impreso estuviera inmediatamente disponible para la llamada del observador readline. Cuando lo probé con el ejecutable basado en Fortran que realmente quiero envolver / mirar, no almacena su salida en el búfer, por lo que se comporta como se esperaba.
flutefreak7
¿Es posible permitir que el subproceso persista y realizar más operaciones de lectura / escritura? readline_and_kill, en su segundo script, funciona de manera muy similar subprocess.comunicatea que termina el proceso después de una operación de lectura / escritura. También veo que está utilizando una sola tubería stdout, que el subproceso maneja como no bloqueante. Intento usar ambos stdouty stderr encuentro que termino bloqueando .
Carel
@Carel, el código en la respuesta funciona según lo previsto, como se describe explícitamente en la respuesta. Es posible implementar otro comportamiento si lo desea. Ambas canalizaciones son igualmente sin bloqueo si se usan, aquí hay un ejemplo de cómo leer ambas canalizaciones simultáneamente .
jfs
19

Pruebe el módulo asyncproc . Por ejemplo:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

El módulo se encarga de todos los subprocesos según lo sugerido por S.Lott.

Noé
fuente
1
Absolutamente brillante. Mucho más fácil que el módulo de subproceso sin procesar. Funciona perfectamente para mí en Ubuntu.
Cerin
12
asyncproc no funciona en windows, y windows no es compatible con os.WNOHANG :-(
Bryan Oakley
26
asyncproc es GPL, lo que limita aún más su uso :-(
Bryan Oakley
Gracias. Una pequeña cosa: parece que reemplazar pestañas con 8 espacios en asyncproc.py es el camino a seguir :)
benjaoming
No parece que pueda obtener el código de retorno del proceso que inició a través del módulo asyncproc; solo la salida que generó.
grayaii
17

Puedes hacer esto muy fácilmente en Twisted . Dependiendo de su base de código existente, esto podría no ser tan fácil de usar, pero si está creando una aplicación retorcida, cosas como esta se vuelven casi triviales. Crea una ProcessProtocolclase y anula el outReceived()método. Retorcido (dependiendo del reactor utilizado) generalmente es solo un gran select()bucle con devoluciones de llamada instaladas para manejar datos de diferentes descriptores de archivos (a menudo tomas de red). Entonces, el outReceived()método es simplemente instalar una devolución de llamada para manejar los datos provenientes de STDOUT. Un ejemplo simple que demuestra este comportamiento es el siguiente:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

La documentación de Twisted tiene buena información al respecto.

Si construye toda su aplicación alrededor de Twisted, hace que la comunicación asincrónica con otros procesos, locales o remotos, sea realmente elegante como esta. Por otro lado, si su programa no está construido sobre Twisted, esto realmente no será tan útil. Esperemos que esto pueda ser útil para otros lectores, incluso si no es aplicable para su aplicación en particular.

Bryan Ward
fuente
no es bueno. selectno debería funcionar en ventanas con descriptores de archivos, de acuerdo con los documentos
n611x007
2
@naxa No creo select()que se refiera al mismo que tú. Asumo esto porque Twistedlas obras en las ventanas ...
notbad.jpeg
1
"Retorcido (dependiendo del reactor utilizado) generalmente es solo un gran bucle select ()" significa que hay varios reactores para elegir. El select()uno es el más portátil en Unixes y Me gusta, pero también hay dos reactores disponibles para Windows: twistedmatrix.com/documents/current/core/howto/…
clacke
14

Use select & read (1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Para readline () - como:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a
Andy Jackson
fuente
66
no es bueno. selectno debería funcionar en ventanas con descriptores de archivos, de acuerdo con los documentos
n611x007
DIOS MIO. Leer megabytes, o posiblemente gigabytes, un carácter a la vez ... esa es la peor idea que he visto en mucho tiempo ... no hace falta mencionar que este código no funciona, porque proc.stdout.read()no importa cuán pequeño sea el argumento Una llamada de bloqueo.
wvxvw
OSError: [WinError 10093] Either the application has not called WSAStartup, or WSAStartup failed
nmz787
8

Una solución es hacer otro proceso para realizar su lectura del proceso, o hacer un hilo del proceso con un tiempo de espera.

Aquí está la versión roscada de una función de tiempo de espera:

http://code.activestate.com/recipes/473878/

Sin embargo, ¿necesita leer el stdout a medida que entra? Otra solución puede ser volcar la salida en un archivo y esperar a que el proceso termine de usar p.wait () .

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()
Monkut
fuente
parece que el hilo de recpie no saldría después del tiempo de espera y matarlo depende de poder matar el subproceso (sg. de lo contrario no está relacionado a este respecto) se lee (algo que debería poder pero solo en caso de que no pueda ...) .
n611x007
7

Descargo de responsabilidad: esto solo funciona para tornado

Puede hacer esto configurando el fd para que no se bloquee y luego use ioloop para registrar devoluciones de llamada. He empaquetado esto en un huevo llamado tornado_subprocess y puedes instalarlo a través de PyPI:

easy_install tornado_subprocess

ahora puedes hacer algo como esto:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

también puedes usarlo con un RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()
Vukasin Toroman
fuente
Gracias por la buena característica! Solo para aclarar, ¿por qué no podemos simplemente usarlo threading.Threadpara crear nuevos procesos sin bloqueo? Lo utilicé en la on_messageinstancia de Websocket de Tornado, y funcionó bien.
VisioN
1
El enhebrado se desaconseja principalmente en los tornados. están bien para funciones pequeñas y de ejecución corta. Puede leer sobre esto aquí: stackoverflow.com/questions/7846323/tornado-web-and-threads github.com/facebook/tornado/wiki/Threading-and-concurrency
Vukasin Toroman
@VukasinToroman, realmente me salvaste aquí con esto. muchas gracias por el módulo tornado_subprocess :)
James Gentes
funciona esto en windows? (tenga en cuenta que select, con los descriptores de archivo, no )
n611x007
Esta biblioteca no usa la selectllamada. No lo he intentado en Windows, pero probablemente te encuentres en problemas ya que lib está usando el fcntlmódulo. En resumen: no, esto probablemente no funcionará en Windows.
Vukasin Toroman
6

Las soluciones existentes no me funcionaron (detalles a continuación). Lo que finalmente funcionó fue implementar readline usando read (1) (basado en esta respuesta ). Este último no bloquea:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Por qué las soluciones existentes no funcionaron:

  1. Las soluciones que requieren readline (incluidas las basadas en cola) siempre se bloquean. Es difícil (¿imposible?) Matar el hilo que ejecuta readline. Solo se elimina cuando finaliza el proceso que lo creó, pero no cuando se finaliza el proceso de producción de salida.
  2. La combinación de fcntl de bajo nivel con llamadas de línea de lectura de alto nivel puede no funcionar correctamente como anonnn ha señalado.
  3. Usar select.poll () es bueno, pero no funciona en Windows de acuerdo con los documentos de Python.
  4. El uso de bibliotecas de terceros parece excesivo para esta tarea y agrega dependencias adicionales.
Vikram Pudi
fuente
1
1. q.get_nowait()de mi respuesta no debe bloquear, nunca, ese es el punto de usarlo. 2. El subproceso que ejecuta readline ( enqueue_output()función ) sale en EOF, por ejemplo, incluido el caso en que se mata el proceso de producción de salida. Si crees que no es así; proporcione un ejemplo de código mínimo completo que muestre lo contrario (tal vez como una nueva pregunta ).
jfs
1
@sebastian Pasé una hora o más tratando de encontrar un ejemplo mínimo. Al final, debo aceptar que su respuesta maneja todos los casos. Supongo que no funcionó antes para mí porque cuando estaba tratando de matar el proceso de producción de salida, ya estaba apagado y me dio un error difícil de depurar. La hora fue bien gastada, porque si bien se me ocurrió un ejemplo mínimo, podría encontrar una solución más simple.
Vikram Pudi
¿Podría publicar la solución más simple también? :) (si es diferente de la de Sebastian)
n611x007
@ danger89: creo dcmpid = myprocess.
ViFI
En condición después de la llamada read () (justo después de True): out nunca será una cadena vacía porque lee al menos string / bytes con una longitud de 1.
sergzach
6

Aquí está mi código, utilizado para capturar cada salida del subproceso lo antes posible, incluidas las líneas parciales. Bombea al mismo tiempo y stdout y stderr en el orden casi correcto.

Probado y funcionado correctamente en Python 2.7 Linux y Windows.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()
datacompboy
fuente
Una de las pocas respuestas que le permiten leer cosas que no necesariamente terminan con una nueva línea.
total
5

Agrego este problema para leer algunos subprocesos. Abrir stdout. Aquí está mi solución de lectura sin bloqueo:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Sebastien Claeys
fuente
55
fcntl no funciona en windows, según los documentos .
n611x007
@anatolytechtonik usar msvcrt.kbhit()en su lugar
gato
4

Esta versión de lectura sin bloqueo no requiere módulos especiales y funcionará de forma inmediata en la mayoría de las distribuciones de Linux.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())
Tom Lime
fuente
3

Aquí hay una solución simple basada en hilos que:

  • funciona tanto en Linux como en Windows (sin depender de él select).
  • lee ambos stdouty stderrasincrónicamente.
  • no se basa en encuestas activas con tiempo de espera arbitrario (compatible con CPU).
  • no usa asyncio(lo que puede entrar en conflicto con otras bibliotecas).
  • se ejecuta hasta que finaliza el proceso secundario.

impresora.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()
Olivier Michel
fuente
2

Agregando esta respuesta aquí, ya que proporciona la capacidad de establecer tuberías sin bloqueo en Windows y Unix.

Todos los ctypesdetalles son gracias a la respuesta de @ techtonik .

Hay una versión ligeramente modificada para ser utilizada tanto en sistemas Unix como Windows.

  • Compatible con Python3 (solo se necesitan cambios menores) .
  • Incluye la versión posix, y define la excepción para usar para cualquiera.

De esta manera, puede usar la misma función y excepción para el código Unix y Windows.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: /programming/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

Para evitar leer datos incompletos, terminé escribiendo mi propio generador de línea de lectura (que devuelve la cadena de bytes para cada línea).

Es un generador para que pueda, por ejemplo ...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)
ideasman42
fuente
(1) este comentario indica que readline()no funciona con tuberías sin bloqueo (como el uso de conjuntos fcntl) en Python 2: ¿cree que ya no es correcto? (mi respuesta contiene el enlace ( fcntl) que proporciona la misma información pero parece eliminado ahora). (2) Vea cómo se multiprocessing.connection.PipeutilizaSetNamedPipeHandleState
jfs
Solo probé esto en Python3. Pero también vi esta información y espero que siga siendo válida. También escribí mi propio código para usar en lugar de readline, he actualizado mi respuesta para incluirlo.
ideasman42
2

Tengo el problema del interrogador original, pero no quería invocar hilos. Mezclé la solución de Jesse con una lectura directa () desde la tubería, y mi propio controlador de búfer para lecturas de línea (sin embargo, mi subproceso - ping - siempre escribía líneas completas <un tamaño de página del sistema). Evito la espera ocupada solo leyendo en un reloj io registrado por un objeto. En estos días, generalmente ejecuto código dentro de un objeto principal MainLoop para evitar subprocesos.

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

El observador es

def watch(f, *other):
print 'reading',f.read()
return True

Y el programa principal configura un ping y luego llama al bucle de correo gobject.

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

Cualquier otro trabajo se adjunta a devoluciones de llamada en gobject.

Dave Kitchen
fuente
2

Las cosas están mucho mejor en Python moderno.

Aquí hay un programa secundario simple, "hello.py":

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

Y un programa para interactuar con él:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

Eso se imprime:

b'hello bob\n'
b'hello alice\n'

Tenga en cuenta que el patrón real, que también se encuentra en casi todas las respuestas anteriores, tanto aquí como en preguntas relacionadas, es establecer el descriptor de archivo stdout del niño en no bloqueante y luego sondearlo en algún tipo de ciclo de selección. En estos días, por supuesto, ese bucle lo proporciona asyncio.

usuario240515
fuente
1

El módulo de selección le ayuda a determinar dónde está la siguiente entrada útil.

Sin embargo, casi siempre eres más feliz con hilos separados. Uno hace un bloqueo de lectura del stdin, otro lo hace donde sea que no desee que se bloquee.

S.Lott
fuente
11
Creo que esta respuesta no es útil por dos razones: (a) El módulo de selección no funcionará en tuberías bajo Windows (como se indica claramente en el enlace proporcionado), lo que anula las intenciones del OP de tener una solución portátil. (b) Los subprocesos asincrónicos no permiten un diálogo sincrónico entre el proceso primario y el secundario. ¿Qué sucede si el proceso padre desea enviar la siguiente acción de acuerdo con la siguiente línea leída del hijo?
ThomasH
44
select tampoco es útil ya que las lecturas de Python se bloquearán incluso después de la selección, porque no tiene una semántica C estándar y no devolverá datos parciales.
Helmut Grohne
Un umbral separado para leer de la salida del niño resolvió mi problema que era similar a esto. Si necesita una interacción sincrónica, supongo que no puede usar esta solución (a menos que sepa qué salida esperar). Hubiera aceptado esta respuesta
Emiliano
1

¿Por qué molestar hilo y cola? a diferencia de readline (), BufferedReader.read1 () no bloqueará la espera de \ r \ n, devuelve lo antes posible si hay alguna salida entrante.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()
mfmain
fuente
¿Volverá lo antes posible si no viene nada? Si no es así, está bloqueando.
Mathieu Pagé
@ MathieuPagé tiene razón. read1se bloqueará si la primera lectura subyacente se bloquea, lo que sucede cuando la tubería aún está abierta pero no hay ninguna entrada disponible.
Jack O'Connor
1

En mi caso, necesitaba un módulo de registro que capture la salida de las aplicaciones en segundo plano y la aumente (agregando marcas de tiempo, colores, etc.).

Terminé con un hilo de fondo que hace la E / S real. El siguiente código es solo para plataformas POSIX. Me quité las partes no esenciales.

Si alguien va a usar esta bestia durante largos períodos, considere administrar descriptores abiertos. En mi caso no fue un gran problema.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)
Dmytro
fuente
1

Mi problema es un poco diferente, ya que quería recopilar stdout y stderr de un proceso en ejecución, pero en última instancia es el mismo ya que quería renderizar la salida en un widget a medida que se genera.

No quería recurrir a muchas de las soluciones alternativas propuestas usando Colas o Subprocesos adicionales, ya que no deberían ser necesarias para realizar una tarea tan común como ejecutar otro script y recopilar su salida.

Después de leer las soluciones propuestas y los documentos de Python, resolví mi problema con la implementación a continuación. Sí, solo funciona para POSIX ya que estoy usando la selectllamada a la función.

Estoy de acuerdo en que los documentos son confusos y la implementación es incómoda para una tarea de script tan común. Creo que las versiones anteriores de Python tienen diferentes valores predeterminados Popeny explicaciones diferentes, por lo que crearon mucha confusión. Esto parece funcionar bien tanto para Python 2.7.12 como para 3.5.2.

La clave era establecer el bufsize=1almacenamiento en línea del búfer y luego universal_newlines=Trueprocesarlo como un archivo de texto en lugar de un binario que parece convertirse en el predeterminado al configurarlo bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG y VERBOSE son simplemente macros que imprimen la salida al terminal.

Esta solución es IMHO 99.99% efectiva ya que todavía usa la readlinefunción de bloqueo , por lo que asumimos que el subproceso es bueno y genera líneas completas.

Agradezco sus comentarios para mejorar la solución, ya que todavía soy nuevo en Python.

Brooke Wallace
fuente
En este caso particular, puede establecer stderr = subprocess.STDOUT en el constructor de Popen y obtener todos los resultados de cmd.stdout.readline ().
Aaron
Buen ejemplo claro. Estaba teniendo problemas con select.select () pero esto lo resolvió por mí.
maharvey67
0

Trabajando a partir de la respuesta de JF Sebastian, y varias otras fuentes, he reunido un simple administrador de subprocesos. Proporciona la solicitud de lectura sin bloqueo, además de ejecutar varios procesos en paralelo. No utiliza ninguna llamada específica del sistema operativo (que yo sepa) y, por lo tanto, debería funcionar en cualquier lugar.

Está disponible en pypi, así que solo pip install shelljob. Consulte la página del proyecto para ver ejemplos y documentos completos.

edA-qa mort-ora-y
fuente
0

EDITAR: Esta implementación todavía bloquea. Utilice la respuesta de JFSebastian en su lugar.

Intenté la respuesta principal , pero el riesgo adicional y el mantenimiento del código de hilo era preocupante.

Mirando a través del módulo io (y estando limitado a 2.6), encontré BufferedReader. Esta es mi solución sin hilos y sin bloqueo.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line
romc
fuente
has intentado for line in iter(p.stdout.readline, ""): # do stuff with the line? No tiene subprocesos (subproceso único) y se bloquea cuando el código se bloquea.
jfs
@ jf-sebastian Sí, eventualmente volví a tu respuesta. Mi implementación todavía ocasionalmente bloqueada. Editaré mi respuesta para advertir a otros que no sigan esta ruta.
romc
0

Recientemente me topé con el mismo problema que necesito leer una línea a la vez desde la transmisión (ejecución de cola en subproceso) en modo sin bloqueo. Quería evitar los siguientes problemas: no quemar la CPU, no leer la transmisión por un byte ( como lo hizo readline), etc.

Aquí está mi implementación https://gist.github.com/grubberr/5501e1a9760c3eab5e0a no es compatible con Windows (encuesta), no maneje EOF, pero funciona bien para mí

Grubberr
fuente
la respuesta a base de hilo no no quemar la CPU (puede especificar arbitraria timeoutcomo en su solución) y .readline()lee más de un byte a la vez ( bufsize=1mediante línea de -buffered (sólo es relevante para la escritura)). ¿Qué otros problemas has encontrado? Las respuestas de solo enlace no son muy útiles.
jfs
0

Este es un ejemplo para ejecutar un comando interactivo en un subproceso, y el stdout es interactivo mediante el uso de pseudo terminal. Puede consultar: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
Liao
fuente
0

Esta solución utiliza el selectmódulo para "leer los datos disponibles" de una secuencia de E / S. Esta función bloquea inicialmente hasta que los datos estén disponibles, pero luego lee solo los datos que están disponibles y no bloquea más.

Dado que usa el selectmódulo, esto solo funciona en Unix.

El código es totalmente compatible con PEP8.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer
Bradley Odell
fuente
0

También enfrenté el problema descrito por Jesse y lo resolví usando "select" como lo hicieron Bradley , Andy y otros, pero en modo de bloqueo para evitar un bucle ocupado. Utiliza una pipa ficticia como un stdin falso. Seleccione bloques y espere a que esté listo el stdin o la tubería. Cuando se presiona una tecla, stdin desbloquea la selección y el valor de la tecla se puede recuperar con read (1). Cuando un hilo diferente escribe en la tubería, la tubería desbloquea la selección y se puede tomar como una indicación de que la necesidad de stdin ha terminado. Aquí hay un código de referencia:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()
gonzaedu61
fuente
NOTA: Para que esto funcione en Windows, la tubería debe reemplazarse por un zócalo. Todavía no lo intenté, pero debería funcionar de acuerdo con la documentación.
gonzaedu61
0

Pruebe wexpect , que es la alternativa de Windows de pexpect .

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()
betontalpfa
fuente
0

En los sistemas similares a Unix y Python 3.5+ hay os.set_blockingquien hace exactamente lo que dice.

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

Esto produce:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

Con os.set_blockingcomentado es:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''
saaj
fuente
-2

Aquí hay un módulo que admite lecturas sin bloqueo y escrituras en segundo plano en python:

https://pypi.python.org/pypi/python-nonblock

Proporciona una función

nonblock_read que leerá los datos de la secuencia, si está disponible, de lo contrario devolverá una cadena vacía (o Ninguno si la secuencia está cerrada en el otro lado y se han leído todos los datos posibles)

También puede considerar el módulo python-subprocess2,

https://pypi.python.org/pypi/python-subprocess2

que se agrega al módulo de subproceso. Entonces, en el objeto devuelto por "subprocess.Popen" se agrega un método adicional, runInBackground. Esto inicia un hilo y devuelve un objeto que se completará automáticamente a medida que las cosas se escriben en stdout / stderr, sin bloquear su hilo principal.

¡Disfrutar!

Tim Savannah
fuente
Me gustaría probar este módulo sin bloque , pero soy relativamente nuevo en algunos de los procedimientos de Linux. ¿Exactamente cómo instalo estas rutinas? Estoy ejecutando Raspbian Jessie, una versión de Debian Linux para Raspberry Pi. Intenté 'sudo apt-get install nonblock' y python-nonblock y ambos arrojaron un error, no encontrado. He descargado el archivo zip de este sitio pypi.python.org/pypi/python-nonblock , pero no sé qué hacer con él. Gracias .... RDK
RDK