¿Qué es un buen algoritmo de limitación de velocidad?

155

Podría usar algún pseudocódigo, o mejor, Python. Estoy tratando de implementar una cola de limitación de velocidad para un robot Python IRC, y funciona parcialmente, pero si alguien activa menos mensajes que el límite (por ejemplo, el límite de velocidad es de 5 mensajes por 8 segundos y la persona solo activa 4), y el siguiente desencadenante es durante los 8 segundos (por ejemplo, 16 segundos después), el bot envía el mensaje, pero la cola se llena y el bot espera 8 segundos, aunque no es necesario ya que el período de 8 segundos ha transcurrido.

miniman
fuente

Respuestas:

231

Aquí el algoritmo más simple , si solo desea soltar los mensajes cuando llegan demasiado rápido (en lugar de ponerlos en cola, lo que tiene sentido porque la cola podría ser arbitrariamente grande):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

No hay estructuras de datos, temporizadores, etc. en esta solución y funciona de manera limpia :) Para ver esto, la 'asignación' crece a una velocidad de 5/8 unidades por segundo como máximo, es decir, como máximo cinco unidades por ocho segundos. Cada mensaje que se reenvía deduce una unidad, por lo que no puede enviar más de cinco mensajes por cada ocho segundos.

Tenga en cuenta que ratedebe ser un número entero, es decir, sin una parte decimal distinta de cero, o el algoritmo no funcionará correctamente (la tasa real no será rate/per). Por ejemplo rate=0.5; per=1.0;, no funciona porque allowancenunca crecerá a 1.0. Pero rate=1.0; per=2.0;funciona bien.

Antti Huima
fuente
44
También vale la pena señalar que la dimensión y la escala de 'time_passed' deben ser las mismas que 'per', por ejemplo, segundos.
skaffman
2
Hola skaffman, gracias por los cumplidos --- lo eché de mi manga, pero con el 99,9% de probabilidad de que alguien ha anterior se le ocurrió una solución similar :)
Antti Huima
52
Ese es un algoritmo estándar: es un depósito de tokens, sin cola. El cubo es allowance. El tamaño del cubo es rate. La allowance += …línea es una optimización de agregar un token a cada velocidad ÷ por segundo.
derobert
55
@zwirbeltier Lo que escribes arriba no es cierto. 'Permitir' siempre está limitado por 'velocidad' (mire la línea "// acelerador"), por lo que solo permitirá una ráfaga de mensajes exactos de "velocidad" en cualquier momento en particular, es decir 5.
Antti Huima
8
Esto es bueno, pero puede superar la tasa. Digamos que en el momento 0 reenvía 5 mensajes, luego en el momento N * (8/5) para N = 1, 2, ... puede enviar otro mensaje, lo que resulta en más de 5 mensajes en un período de 8 segundos
mindvirus
48

Use este decorador @RateLimited (ratepersec) antes de su función que pone en cola.

Básicamente, esto verifica si han transcurrido 1 / tasa de segundos desde la última vez y si no, espera el resto del tiempo, de lo contrario no espera. Esto efectivamente lo limita a la tasa / seg. El decorador se puede aplicar a cualquier función que desee con velocidad limitada.

En su caso, si desea un máximo de 5 mensajes por 8 segundos, use @RateLimited (0.625) antes de su función sendToQueue.

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)
Carlos A. Ibarra
fuente
Me gusta la idea de usar un decorador para este propósito. ¿Por qué lastTimeCalled es una lista? Además, dudo que esto funcione cuando varios subprocesos llaman a la misma función RateLimited ...
Stephan202
8
Es una lista porque los tipos simples como flotante son constantes cuando son capturados por un cierre. Al convertirlo en una lista, la lista es constante, pero su contenido no lo es. Sí, no es seguro para subprocesos, pero eso se puede arreglar fácilmente con bloqueos.
Carlos A. Ibarra
time.clock()no tiene suficiente resolución en mi sistema, así que time.time()
adapté
3
Para limitar la velocidad, definitivamente no desea usar time.clock(), que mide el tiempo transcurrido de la CPU. El tiempo de CPU puede correr mucho más rápido o mucho más lento que el tiempo "real". En su time.time()lugar, desea usar , que mide el tiempo de la pared (tiempo "real").
John Wiseman
1
Por cierto, para sistemas de producción reales: implementar una limitación de velocidad con una llamada sleep () podría no ser una buena idea ya que va a bloquear el hilo y, por lo tanto, evitará que otro cliente lo use.
Maresh
28

Un Token Bucket es bastante simple de implementar.

Comience con un cubo con 5 fichas.

Cada 5/8 segundos: si el cubo tiene menos de 5 tokens, agrega uno.

Cada vez que desee enviar un mensaje: si el depósito tiene ≥1 ficha, saque una ficha y envíe el mensaje. De lo contrario, espere / suelte el mensaje / lo que sea.

(obviamente, en el código real, usaría un contador entero en lugar de tokens reales y puede optimizar cada paso de 5/8 almacenando marcas de tiempo)


Leyendo la pregunta nuevamente, si el límite de velocidad se restablece completamente cada 8 segundos, entonces aquí hay una modificación:

Comience con una marca de tiempo, last_sendhace mucho tiempo (por ejemplo, en la época). Además, comience con el mismo cubo de 5 fichas.

Aplica la regla cada 5/8 segundos.

Cada vez que envía un mensaje: Primero, verifique si hace last_send≥ 8 segundos. Si es así, llene el cubo (configúrelo en 5 fichas). En segundo lugar, si hay tokens en el depósito, envíe el mensaje (de lo contrario, soltar / esperar / etc.). Tercero, listo last_sendpara ahora.

Eso debería funcionar para ese escenario.


De hecho, he escrito un bot IRC usando una estrategia como esta (el primer enfoque). Está en Perl, no en Python, pero aquí hay un código para ilustrar:

La primera parte aquí maneja agregar tokens al cubo. Puede ver la optimización de agregar tokens en función del tiempo (de la segunda a la última línea) y luego la última línea sujeta el contenido del depósito al máximo (MESSAGE_BURST)

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$ conn es una estructura de datos que se pasa. Esto está dentro de un método que se ejecuta de manera rutinaria (calcula cuándo será la próxima vez que tenga algo que hacer y duerme tanto tiempo o hasta que obtenga tráfico de red). La siguiente parte del método maneja el envío. Es bastante complicado, porque los mensajes tienen prioridades asociadas con ellos.

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

Esa es la primera cola, que se ejecuta sin importar qué. Incluso si se mata nuestra conexión por inundación. Se usa para cosas extremadamente importantes, como responder al PING del servidor. A continuación, el resto de las colas:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

Finalmente, el estado del depósito se guarda nuevamente en la estructura de datos $ conn (en realidad un poco más adelante en el método; primero calcula qué tan pronto tendrá más trabajo)

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

Como puede ver, el código real de manejo de la cubeta es muy pequeño, aproximadamente cuatro líneas. El resto del código es manejo prioritario de colas. El bot tiene colas prioritarias para que, por ejemplo, alguien que chatea con él no pueda evitar que realice sus importantes tareas de patada / prohibición.

derobert
fuente
Me estoy perdiendo algo ... parece que esto limitaría a 1 mensaje cada 8 segundos después de obtener a través de la primera 5
chills42
@ chills42: Sí, leí mal la pregunta ... vea la segunda mitad de la respuesta.
derobert
@chills: si last_send es <8 segundos, no agrega ninguna ficha al cubo. Si su depósito contiene tokens, puede enviar el mensaje; de lo contrario no puede (ya ha enviado 5 mensajes en los últimos 8 segundos)
derobert
3
Agradecería que la gente que votó por esto explique por qué ... Me gustaría solucionar cualquier problema que vea, ¡pero eso es difícil de hacer sin comentarios!
derobert
10

para bloquear el procesamiento hasta que se pueda enviar el mensaje, haciendo cola más mensajes, la hermosa solución de antti también se puede modificar de esta manera:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

solo espera hasta que haya suficiente margen para enviar el mensaje. para no comenzar con dos veces la tasa, la asignación también puede inicializarse con 0.

san
fuente
55
Cuando duermes (1-allowance) * (per/rate), debes agregar esa misma cantidad last_check.
Alp
2

Mantenga la hora en que se enviaron las últimas cinco líneas. Mantenga los mensajes en cola hasta que el quinto mensaje más reciente (si existe) haya pasado al menos 8 segundos (con last_five como un conjunto de veces):

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()
pesto
fuente
No desde que lo revisaste, no lo estoy.
Pesto
Está almacenando cinco marcas de tiempo y desplazándolas repetidamente a través de la memoria (o haciendo operaciones de listas vinculadas). Estoy almacenando un contador entero y una marca de tiempo. Y solo haciendo aritmética y asignar.
derobert
2
Excepto que el mío funcionará mejor si intenta enviar 5 líneas, pero solo se permiten 3 más en el período de tiempo. El tuyo permitirá enviar los primeros tres y forzará una espera de 8 segundos antes de enviar 4 y 5. El mío permitirá que 4 y 5 se envíen 8 segundos después de la cuarta y quinta líneas más recientes.
Pesto
1
Pero sobre el tema, el rendimiento podría mejorarse mediante el uso de una lista circular enlazada de longitud 5, apuntando al quinto envío más reciente, sobrescribiéndolo en el nuevo envío y moviendo el puntero hacia adelante.
Pesto
para un bot irc con una velocidad limitadora de velocidad no es un problema. Prefiero la solución de la lista, ya que es más legible. la respuesta que se ha dado es confusa debido a la revisión, pero tampoco tiene nada de malo.
jheriko
2

Una solución es adjuntar una marca de tiempo a cada elemento de la cola y descartar el elemento después de que hayan pasado 8 segundos. Puede realizar esta verificación cada vez que se agrega la cola.

Esto solo funciona si limita el tamaño de la cola a 5 y descarta cualquier adición mientras la cola está llena.

jheriko
fuente
1

Si alguien todavía está interesado, utilizo esta simple clase invocable junto con un almacenamiento de valor de clave LRU cronometrado para limitar la tasa de solicitud por IP. Utiliza una deque, pero puede reescribirse para usarse con una lista en su lugar.

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")
sanyi
fuente
1

Solo una implementación en Python de un código de respuesta aceptada.

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler
Hodza
fuente
Me han sugerido que le sugiera que agregue un ejemplo de uso de su código .
Luc
0

Qué tal esto:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}

fuente
0

Necesitaba una variación en Scala. Aquí está:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A  B) extends (A  B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

Así es como se puede usar:

val f = Limiter((5d, 8d), { 
  _: Unit  
    println(System.currentTimeMillis) 
})
while(true){f(())}
Landon Kuhn
fuente