Sistema de eventos en Python

195

¿Qué sistema de eventos para Python usas? Ya estoy al tanto de pydispatcher , pero me preguntaba qué más se puede encontrar o se usa comúnmente.

No me interesan los administradores de eventos que forman parte de grandes marcos, prefiero usar una pequeña solución básica que pueda ampliar fácilmente.

Josip
fuente

Respuestas:

178

Paquetes PyPI

A partir de junio de 2020, estos son los paquetes relacionados con eventos disponibles en PyPI, ordenados por la fecha de lanzamiento más reciente.

Hay más

Hay muchas bibliotecas para elegir, usando una terminología muy diferente (eventos, señales, controladores, envío de métodos, ganchos, ...).

Estoy tratando de mantener una visión general de los paquetes anteriores, además de las técnicas mencionadas en las respuestas aquí.

Primero, algo de terminología ...

Patrón de observador

El estilo más básico del sistema de eventos es la 'bolsa de métodos de manejo', que es una implementación simple del patrón Observador .

Básicamente, los métodos del controlador (invocables) se almacenan en una matriz y cada uno de ellos se llama cuando el evento 'se dispara'.

Publicar-Suscribirse

La desventaja de los sistemas de eventos de Observer es que solo puede registrar los controladores en el objeto Evento real (o en la lista de controladores). Entonces, en el momento de la inscripción, el evento ya debe existir.

Es por eso que existe el segundo estilo de sistemas de eventos: el patrón de publicación-suscripción . Aquí, los controladores no se registran en un objeto de evento (o lista de controladores), sino en un distribuidor central. Además, los notificadores solo hablan con el despachador. Qué escuchar o qué publicar está determinado por la 'señal', que no es más que un nombre (cadena).

Patrón de mediador

También podría ser de interés: el patrón Mediador .

Manos

Un sistema 'enganchado' se usa habitualmente en el contexto de complementos de aplicaciones. La aplicación contiene puntos de integración fijos (ganchos), y cada complemento puede conectarse a ese gancho y realizar ciertas acciones.

Otros eventos'

Nota: threading.Event no es un 'sistema de eventos' en el sentido anterior. Es un sistema de sincronización de hilos donde un hilo espera hasta que otro hilo 'señale' el objeto Evento.

Las bibliotecas de mensajería de red a menudo usan el término 'eventos' también; a veces estos son similares en concepto; a veces no. Por supuesto, pueden atravesar límites de hilos, procesos y computadoras. Ver, por ejemplo , pyzmq , pymq , Twisted , Tornado , gevent , eventlet .

Referencias débiles

En Python, mantener una referencia a un método u objeto asegura que el recolector de basura no lo eliminará. Esto puede ser deseable, pero también puede provocar pérdidas de memoria: los controladores vinculados nunca se limpian.

Algunos sistemas de eventos usan referencias débiles en lugar de regulares para resolver esto.

Algunas palabras sobre las diferentes bibliotecas.

Sistemas de eventos estilo observador:

  • zope.event muestra los aspectos básicos de cómo funciona esto (ver la respuesta de Lennart ). Nota: este ejemplo ni siquiera admite argumentos de manejador.
  • La implementación de 'lista invocable' de LongPoke muestra que dicho sistema de eventos puede implementarse de manera muy minimalista mediante la subclasificación list.
  • La variación de Felk EventHook también garantiza las firmas de los callejeros y las personas que llaman.
  • El EventHook de spassig (Patrón de eventos de Michael Foord) es una implementación sencilla.
  • La clase Evento de Lecciones Valoradas de Josip es básicamente la misma, pero usa un en setlugar de un listpara almacenar la bolsa, e implementos __call__que son adiciones razonables.
  • PyNotify es similar en concepto y también proporciona conceptos adicionales de variables y condiciones ('evento de cambio de variable'). La página de inicio no es funcional.
  • Axel es básicamente una bolsa de manipuladores con más características relacionadas con el enhebrado, el manejo de errores, ...
  • python-dispatch requiere que se deriven las clases de origen pares pydispatch.Dispatcher.
  • buslane está basado en clases, admite manejadores únicos o múltiples y facilita sugerencias de tipo extensas.
  • El observador / evento de Pithikos es un diseño liviano.

Bibliotecas de publicación y suscripción:

  • Blinker tiene algunas características ingeniosas, como la desconexión automática y el filtrado basado en el remitente.
  • PyPubSub es un paquete estable y promete "características avanzadas que facilitan la depuración y el mantenimiento de temas y mensajes".
  • pymitter es un puerto Python de Node.js EventEmitter2 y ofrece espacios de nombres, comodines y TTL.
  • PyDispatcher parece enfatizar la flexibilidad con respecto a la publicación de muchos a muchos, etc. Admite referencias débiles.
  • louie es un PyDispatcher reelaborado y debería funcionar "en una amplia variedad de contextos".
  • pypydispatcher se basa en PyDispatcher (lo has adivinado ...) y también funciona en PyPy.
  • django.dispatch es un PyDispatcher reescrito "con una interfaz más limitada, pero de mayor rendimiento".
  • pyeventdispatcher se basa en el despachador de eventos del framework Symfony de PHP.
  • el despachador se extrajo de django.dispatch pero se está volviendo bastante viejo.
  • El EventManger de Cristian García es una implementación realmente corta.

Otros:

  • Pluggy contiene un sistema de gancho que utilizan los pytestcomplementos.
  • RxPy3 implementa el patrón Observable y permite fusionar eventos, reintentar, etc.
  • Las señales y ranuras de Qt están disponibles en PyQt o PySide2 . Funcionan como devolución de llamada cuando se usan en el mismo hilo o como eventos (usando un bucle de eventos) entre dos hilos diferentes. Las señales y las ranuras tienen la limitación de que solo funcionan en objetos de clases que se derivan QObject.
florisla
fuente
2
También hay louie, que se basa en PyDispatcher: pypi.python.org/pypi/Louie/1.1
the979kid
@ the979kid louie parece estar mal mantenido, la página de pypi enlaza con 404s en GitHub: 11craft.github.io/louie ; github.com/gldnspud/louie . Debería ser github.com/11craft/louie .
florisla
1
Los oyentes de eventos débiles son una necesidad común. De lo contrario, el uso en el mundo real se vuelve arduo. Una nota sobre las soluciones compatibles que pueden ser útiles.
kxr
Pypubsub 4 es muchos a muchos, y tiene poderosas herramientas de depuración para mensajes, y varias formas de restringir la carga de mensajes para que sepa antes cuando ha enviado datos no válidos o datos faltantes. PyPubSub 4 es compatible con Python 3 (y PyPubSub 3.x es compatible con Python 2).
Oliver
Recientemente publiqué una biblioteca llamada pymq github.com/thrau/pymq que puede ser una buena opción para esta lista.
Thrau
98

Lo he estado haciendo de esta manera:

class Event(list):
    """Event subscription.

    A list of callable objects. Calling an instance of this will cause a
    call to each item in the list in ascending order by index.

    Example Usage:
    >>> def f(x):
    ...     print 'f(%s)' % x
    >>> def g(x):
    ...     print 'g(%s)' % x
    >>> e = Event()
    >>> e()
    >>> e.append(f)
    >>> e(123)
    f(123)
    >>> e.remove(f)
    >>> e()
    >>> e += (f, g)
    >>> e(10)
    f(10)
    g(10)
    >>> del e[0]
    >>> e(2)
    g(2)

    """
    def __call__(self, *args, **kwargs):
        for f in self:
            f(*args, **kwargs)

    def __repr__(self):
        return "Event(%s)" % list.__repr__(self)

Sin embargo, como con todo lo que he visto, no hay pydoc generado automáticamente para esto, y no hay firmas, lo que realmente apesta.

L̲̳o̲̳̳n̲̳̳g̲̳̳p̲̳o̲̳̳k̲̳̳e̲̳̳
fuente
3
Este estilo me parece bastante intrigante. Es dulcemente básico. Me gusta el hecho de que permite manipular eventos y sus suscriptores como operaciones autónomas. Veré cómo le va en un proyecto real.
Rudy Lattae
2
Muy bonito estilo minimalista! ¡súper!
akaRem
2
No puedo votar esto lo suficiente, esto es realmente sencillo y fácil.
2
gran favor, ¿alguien podría explicar esto como si tuviera 10 años? ¿Esta clase es heredada por la clase principal? No veo un init así que super () no se usaría. No está haciendo clic para mí por alguna razón.
omgimdrunk
1
@omgimdrunk Un controlador de eventos simple activaría una o más funciones invocables cada vez que se activara un evento. Una clase para "administrar" esto para usted requeriría los siguientes métodos como mínimo: agregar y disparar. Dentro de esa clase, necesitaría mantener una lista de controladores para ejecutar. Pongamos eso en la variable de instancia _bag_of_handlersque es una lista. El método add de la clase simplemente sería self._bag_of_handlers.append(some_callable). El método de disparo de la clase pasaría a través de '_bag_of_handlers' pasando los args y kwargs proporcionados a los controladores y ejecutaría cada uno en secuencia.
Gabe Spradlin
68

Usamos un EventHook como lo sugiere Michael Foord en su Patrón de evento :

Simplemente agregue EventHooks a sus clases con:

class MyBroadcaster()
    def __init__():
        self.onChange = EventHook()

theBroadcaster = MyBroadcaster()

# add a listener to the event
theBroadcaster.onChange += myFunction

# remove listener from the event
theBroadcaster.onChange -= myFunction

# fire event
theBroadcaster.onChange.fire()

Agregamos la funcionalidad para eliminar a todos los oyentes de un objeto a la clase Michaels y terminamos con esto:

class EventHook(object):

    def __init__(self):
        self.__handlers = []

    def __iadd__(self, handler):
        self.__handlers.append(handler)
        return self

    def __isub__(self, handler):
        self.__handlers.remove(handler)
        return self

    def fire(self, *args, **keywargs):
        for handler in self.__handlers:
            handler(*args, **keywargs)

    def clearObjectHandlers(self, inObject):
        for theHandler in self.__handlers:
            if theHandler.im_self == inObject:
                self -= theHandler
spassig
fuente
Un inconveniente de usar esto es que primero debe agregar un evento antes de registrarse como suscriptor. Si solo los editores agregan sus eventos (no es una obligación, solo una buena práctica), entonces debe inicializar los editores antes que los suscriptores, lo cual es una molestia en grandes proyectos
Jonathan
66
el último método tiene errores porque los controladores self .__ se modifican durante las iteraciones. Solución: `self .__ handlers = [h para h en self .__ handlers if h.im_self! = Obj]`
Simon Bergot el
1
@Simon tiene razón, pero introduce un error porque podemos tener funciones independientes en los controladores de self .__. Arreglo:self.__handlers = [h for h in self._handlers if getattr(h, 'im_self', False) != obj]
Eric Marcos
20

Yo uso zope.event . Son los huesos más desnudos que puedas imaginar. :-) De hecho, aquí está el código fuente completo:

subscribers = []

def notify(event):
    for subscriber in subscribers:
        subscriber(event)

Tenga en cuenta que no puede enviar mensajes entre procesos, por ejemplo. No es un sistema de mensajería, solo un sistema de eventos, nada más y nada menos.

Lennart Regebro
fuente
17
pypi.python.org/pypi/zope.event ... para salvar a los pobres Google alguna ancho de banda ;-)
Boldewyn
Todavía me gustaría poder enviar mensajes. Estaría usando el sistema de eventos en la aplicación construida en Tkinter. No estoy usando su sistema de eventos porque no admite mensajes.
Josip
Puedes enviar lo que quieras con zope.event. Pero mi punto es que no es un sistema de mensajería adecuado, ya que no puede enviar eventos / mensajes a otros procesos u otras computadoras. Probablemente debería ser más específico con sus requisitos.
Lennart Regebro
15

Encontré este pequeño script en Lecciones valiosas . Parece que tiene la proporción correcta de simplicidad / potencia que busco. Peter Thatcher es el autor del siguiente código (no se menciona ninguna licencia).

class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount

class MockFileWatcher:
    def __init__(self):
        self.fileChanged = Event()

    def watchFiles(self):
        source_path = "foo"
        self.fileChanged(source_path)

def log_file_change(source_path):
    print "%r changed." % (source_path,)

def log_file_change2(source_path):
    print "%r changed!" % (source_path,)

watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()
Josip
fuente
1
Usar un set () en lugar de una lista es bueno para evitar que los controladores se registren dos veces. Una consecuencia es que no se llama a los manejadores en el orden en que se registraron. Aunque no necesariamente es algo malo ...
florisla
1
@florisla podría cambiarlo por OrderedSet, si así lo desea.
Robino
9

Aquí hay un diseño minimalista que debería funcionar bien. Lo que tiene que hacer es simplemente heredar Observeren una clase y luego usar observe(event_name, callback_fn)para escuchar un evento específico. Siempre que se active ese evento específico en cualquier parte del código (es decir Event('USB connected')), se activará la devolución de llamada correspondiente.

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observed_events = []
    def observe(self, event_name, callback_fn):
        self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})


class Event():
    def __init__(self, event_name, *callback_args):
        for observer in Observer._observers:
            for observable in observer._observed_events:
                if observable['event_name'] == event_name:
                    observable['callback_fn'](*callback_args)

Ejemplo:

class Room(Observer):
    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # DON'T FORGET THIS
    def someone_arrived(self, who):
        print(who + " has arrived!")

# Observe for specific event
room = Room()
room.observe('someone arrived',  room.someone_arrived)

# Fire some events
Event('someone left',    'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted',  'Lenard')
Pithikos
fuente
Me gusta su diseño, es minimalista y fácil de entender. y sería ligero al no tener que importar algunos módulos.
Atreyagaurav
8

Creé una EventManagerclase (código al final). La sintaxis es la siguiente:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )

#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )

#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )

#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun

#Delete an event
del EventManager.eventName

#Fire the event
EventManager.eventName()

Aquí hay un ejemplo:

def hello(name):
    print "Hello {}".format(name)
    
def greetings(name):
    print "Greetings {}".format(name)

EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello

print "\nInitial salute"
EventManager.salute('Oscar')

print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

Salida:

Saludo inicial
Saludos Oscar
Hola Oscar

Ahora elimina saludos
Hola Oscar

Código de administrador de eventos:

class EventManager:
    
    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions
            
        def __iadd__(self,func):
            self.functions.append(func)
            return self
            
        def __isub__(self,func):
            self.functions.remove(func)
            return self
            
        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)
            
    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.
        
        Example:
        
        def hello(): print "Hello ",
        def world(): print "World"
        
        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world
        
        EventManager.salute()
        
        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])
        
        cls.__dict__.update(kvargs)
Cristian Garcia
fuente
8

Puede echar un vistazo a pymitter ( pypi ). Es un enfoque pequeño de un solo archivo (~ 250 loc) "que proporciona espacios de nombres, comodines y TTL".

Aquí hay un ejemplo básico:

from pymitter import EventEmitter

ee = EventEmitter()

# decorator usage
@ee.on("myevent")
def handler1(arg):
   print "handler1 called with", arg

# callback usage
def handler2(arg):
    print "handler2 called with", arg
ee.on("myotherevent", handler2)

# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"

ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"
Dalailirium
fuente
6

Hice una variación del enfoque minimalista de Longpoke que también garantiza las firmas tanto para los que llaman como para los que llaman:

class EventHook(object):
    '''
    A simple implementation of the Observer-Pattern.
    The user can specify an event signature upon inizializazion,
    defined by kwargs in the form of argumentname=class (e.g. id=int).
    The arguments' types are not checked in this implementation though.
    Callables with a fitting signature can be added with += or removed with -=.
    All listeners can be notified by calling the EventHook class with fitting
    arguments.

    >>> event = EventHook(id=int, data=dict)
    >>> event += lambda id, data: print("%d %s" % (id, data))
    >>> event(id=5, data={"foo": "bar"})
    5 {'foo': 'bar'}

    >>> event = EventHook(id=int)
    >>> event += lambda wrong_name: None
    Traceback (most recent call last):
        ...
    ValueError: Listener must have these arguments: (id=int)

    >>> event = EventHook(id=int)
    >>> event += lambda id: None
    >>> event(wrong_name=0)
    Traceback (most recent call last):
        ...
    ValueError: This EventHook must be called with these arguments: (id=int)
    '''
    def __init__(self, **signature):
        self._signature = signature
        self._argnames = set(signature.keys())
        self._handlers = []

    def _kwargs_str(self):
        return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())

    def __iadd__(self, handler):
        params = inspect.signature(handler).parameters
        valid = True
        argnames = set(n for n in params.keys())
        if argnames != self._argnames:
            valid = False
        for p in params.values():
            if p.kind == p.VAR_KEYWORD:
                valid = True
                break
            if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
                valid = False
                break
        if not valid:
            raise ValueError("Listener must have these arguments: (%s)"
                             % self._kwargs_str())
        self._handlers.append(handler)
        return self

    def __isub__(self, handler):
        self._handlers.remove(handler)
        return self

    def __call__(self, *args, **kwargs):
        if args or set(kwargs.keys()) != self._argnames:
            raise ValueError("This EventHook must be called with these " +
                             "keyword arguments: (%s)" % self._kwargs_str())
        for handler in self._handlers[:]:
            handler(**kwargs)

    def __repr__(self):
        return "EventHook(%s)" % self._kwargs_str()
Felk
fuente
3

Si hago código en pyQt, uso el paradigma de zócalos / señales QT, lo mismo es para django

Si estoy haciendo E / S asíncrona, uso el módulo de selección nativo

Si estoy firmando un analizador de python SAX, estoy usando la API de eventos provista por SAX. Entonces parece que soy víctima de la API subyacente :-)

Tal vez debería preguntarse qué espera del marco / módulo de eventos. Mi preferencia personal es usar el paradigma Socket / Signal de QT. Más información sobre eso se puede encontrar aquí

SashaN
fuente
2

Aquí hay otro módulo para su consideración. Parece una opción viable para aplicaciones más exigentes.

Py-notify es un paquete de Python que proporciona herramientas para implementar el patrón de programación Observer. Estas herramientas incluyen señales, condiciones y variables.

Las señales son listas de controladores que se llaman cuando se emite la señal. Las condiciones son básicamente variables booleanas junto con una señal que se emite cuando cambia el estado de la condición. Se pueden combinar utilizando operadores lógicos estándar (no, y, etc.) en condiciones compuestas. Las variables, a diferencia de las condiciones, pueden contener cualquier objeto de Python, no solo booleanos, sino que no se pueden combinar.

Josip
fuente
1
La página de inicio está fuera de servicio para esta, ¿tal vez ya no sea compatible?
David Parks el
1

Si desea hacer cosas más complicadas como fusionar eventos o reintentar, puede usar el patrón Observable y una biblioteca madura que lo implemente. https://github.com/ReactiveX/RxPY . Los observables son muy comunes en Javascript y Java y son muy convenientes de usar para algunas tareas asíncronas.

from rx import Observable, Observer


def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

SALIDA :

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!
David Dehghan
fuente
1

Si necesita un bus de eventos que funcione a través de los límites del proceso o la red, puede probar PyMQ . Actualmente es compatible con pub / sub, colas de mensajes y RPC síncrono. La versión predeterminada funciona sobre un backend de Redis, por lo que necesita un servidor Redis en ejecución. También hay un back-end en memoria para realizar pruebas. También puedes escribir tu propio backend.

import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

Para inicializar el sistema:

from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

Descargo de responsabilidad: soy el autor de esta biblioteca

Thrau
fuente
0

Puedes probar buslane módulo.

Esta biblioteca facilita la implementación del sistema basado en mensajes. Admite el enfoque de comandos (controlador único) y eventos (0 o controladores múltiples). Buslane utiliza anotaciones de tipo Python para registrar correctamente el controlador.

Ejemplo simple:

from dataclasses import dataclass

from buslane.commands import Command, CommandHandler, CommandBus


@dataclass(frozen=True)
class RegisterUserCommand(Command):
    email: str
    password: str


class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):

    def handle(self, command: RegisterUserCommand) -> None:
        assert command == RegisterUserCommand(
            email='[email protected]',
            password='secret',
        )


command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
    email='[email protected]',
    password='secret',
))

Para instalar buslane, simplemente use pip:

$ pip install buslane
Konrad Hałas
fuente
0

Hace algún tiempo escribí una biblioteca que podría ser útil para usted. Le permite tener oyentes locales y globales, múltiples formas diferentes de registrarlos, prioridad de ejecución, etc.

from pyeventdispatcher import register

register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)

dispatch(Event("foo.bar", {"id": 1}))
# first second

Echa un vistazo pyeventdispatcher

Daniel Ancuta
fuente