Media móvil o media móvil

195

¿Existe una función SciPy o una función o módulo NumPy para Python que calcule la media de ejecución de una matriz 1D dada una ventana específica?

Shejo284
fuente

Respuestas:

24

Para una solución corta y rápida que hace todo en un bucle, sin dependencias, el siguiente código funciona muy bien.

mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
cumsum, moving_aves = [0], []

for i, x in enumerate(mylist, 1):
    cumsum.append(cumsum[i-1] + x)
    if i>=N:
        moving_ave = (cumsum[i] - cumsum[i-N])/N
        #can do stuff with moving_ave here
        moving_aves.append(moving_ave)
Aikude
fuente
46
¡¿Rápido?! Esta solución es un orden de magnitud más lenta que las soluciones con Numpy.
Bart
3
Aunque esta solución nativa es genial, el OP solicitó una función numpy / scipy, presumiblemente esas serán considerablemente más rápidas.
Demis
256

UPD: Alleo y jasaarim han propuesto soluciones más eficientes .


Puedes usar np.convolvepara eso:

np.convolve(x, np.ones((N,))/N, mode='valid')

Explicación

La media de ejecución es un caso de la operación matemática de convolución . Para la media de ejecución, desliza una ventana a lo largo de la entrada y calcula la media del contenido de la ventana. Para señales 1D discretas, la convolución es lo mismo, excepto que, en lugar de la media, calcula una combinación lineal arbitraria, es decir, multiplique cada elemento por un coeficiente correspondiente y sume los resultados. Esos coeficientes, uno para cada posición en la ventana, a veces se llaman la convolución del núcleo . Ahora, la media aritmética de los valores de N es (x_1 + x_2 + ... + x_N) / N, entonces el núcleo correspondiente es (1/N, 1/N, ..., 1/N), y eso es exactamente lo que obtenemos al usar np.ones((N,))/N.

Bordes

El modeargumento de np.convolveespecifica cómo manejar los bordes. Elegí el validmodo aquí porque creo que así es como la mayoría de la gente espera que funcione la carrera, pero es posible que tengas otras prioridades. Aquí hay una gráfica que ilustra la diferencia entre los modos:

import numpy as np
import matplotlib.pyplot as plt
modes = ['full', 'same', 'valid']
for m in modes:
    plt.plot(np.convolve(np.ones((200,)), np.ones((50,))/50, mode=m));
plt.axis([-10, 251, -.1, 1.1]);
plt.legend(modes, loc='lower center');
plt.show()

Ejecución de modos medios de convolución

lapislázuli
fuente
55
Me gusta esta solución porque es limpia (una línea) y relativamente eficiente (trabajo realizado dentro de numpy). Pero el uso de la "Solución eficiente" de Alleo numpy.cumsumtiene una mayor complejidad.
Ulrich Stern
2
@denfromufa, creo que la documentación cubre la implementación lo suficientemente bien, y también enlaza con Wikipedia, que explica las matemáticas. Teniendo en cuenta el enfoque de la pregunta, ¿cree que esta respuesta debe copiarlas?
lapis
@lapis el uso de convolve para la media móvil es bastante inusual y no obvio. Aquí está la mejor explicación visual que he encontrado: matlabtricks.com/post-11/moving-average-by-convolution
denfromufa
Para trazar y tareas relacionadas, sería útil llenarlo con valores Ninguno. Mi sugerencia (no tan hermosa pero corta): `` def moving_average (x, N, fill = True): return np.concatenate ([x for x in [[None] * (N // 2 + N% 2) * fill, np.convolve (x, np.ones ((N,)) / N, mode = 'valid'), [None] * (N // 2) * fill,] if len (x)]) ` `` El código se ve tan feo en los comentarios SO xD No quería agregar otra respuesta, ya que había muchas, pero podría copiarlo y pegarlo en su IDE.
Chaoste
146

Solución eficiente

La convolución es mucho mejor que el enfoque directo, pero (supongo) usa FFT y, por lo tanto, es bastante lenta. Sin embargo, especialmente para calcular el funcionamiento, el siguiente enfoque funciona bien

def running_mean(x, N):
    cumsum = numpy.cumsum(numpy.insert(x, 0, 0)) 
    return (cumsum[N:] - cumsum[:-N]) / float(N)

El código para verificar

In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop

Tenga en cuenta que numpy.allclose(result1, result2)es True, dos métodos son equivalentes. A mayor N, mayor diferencia en el tiempo.

advertencia: aunque cumsum es más rápido, habrá un mayor error de coma flotante que puede causar que sus resultados sean inválidos / incorrectos / inaceptables

los comentarios señalaron este problema de error de coma flotante aquí, pero lo estoy haciendo más obvio aquí en la respuesta. .

# demonstrate loss of precision with only 100,000 points
np.random.seed(42)
x = np.random.randn(100000)+1e6
y1 = running_mean_convolve(x, 10)
y2 = running_mean_cumsum(x, 10)
assert np.allclose(y1, y2, rtol=1e-12, atol=0)
  • cuantos más puntos acumule, mayor será el error de coma flotante (por lo tanto, 1e5 puntos es notable, 1e6 puntos es más significativo, más de 1e6 y es posible que desee restablecer los acumuladores)
  • puede hacer trampa usando np.longdoublepero su error de coma flotante seguirá siendo significativo para un número relativamente grande de puntos (alrededor de> 1e5 pero depende de sus datos)
  • puede trazar el error y verlo aumentar relativamente rápido
  • la solución convolve es más lenta pero no tiene esta pérdida de precisión de coma flotante
  • la solución uniform_filter1d es más rápida que esta solución cumsum Y no tiene esta pérdida de precisión de coma flotante
Alleo
fuente
3
Buena solución! Mi presentimiento es numpy.convolveO (mn); sus documentos mencionan que scipy.signal.fftconvolveusa FFT.
Ulrich Stern
3
Este método no trata con los bordes de la matriz, ¿verdad?
JoVe
66
Buena solución, pero tenga en cuenta que puede sufrir errores numéricos para matrices grandes, ya que hacia el final de la matriz, puede restar dos números grandes para obtener un resultado pequeño.
Bas Swinckels
1
Esto usa la división entera en lugar de la división flotante: running_mean([1,2,3], 2)da array([1, 2]). Reemplazar xpor [float(value) for value in x]hace el truco.
ChrisW
44
La estabilidad numérica de esta solución puede convertirse en un problema si xcontiene flotadores. Ejemplo: running_mean(np.arange(int(1e7))[::-1] + 0.2, 1)[-1] - 0.2regresa 0.003125mientras uno espera 0.0. Más información: en.wikipedia.org/wiki/Loss_of_significance
Milán
80

Actualización: el siguiente ejemplo muestra la pandas.rolling_meanfunción anterior que se ha eliminado en versiones recientes de pandas. Un equivalente moderno de la llamada a la función a continuación sería

In [8]: pd.Series(x).rolling(window=N).mean().iloc[N-1:].values
Out[8]: 
array([ 0.49815397,  0.49844183,  0.49840518, ...,  0.49488191,
        0.49456679,  0.49427121])

pandas es más adecuado para esto que NumPy o SciPy. Su función rolling_mean hace el trabajo convenientemente. También devuelve una matriz NumPy cuando la entrada es una matriz.

Es difícil superar el rolling_meanrendimiento con cualquier implementación personalizada de Python puro. Aquí hay un ejemplo de rendimiento contra dos de las soluciones propuestas:

In [1]: import numpy as np

In [2]: import pandas as pd

In [3]: def running_mean(x, N):
   ...:     cumsum = np.cumsum(np.insert(x, 0, 0)) 
   ...:     return (cumsum[N:] - cumsum[:-N]) / N
   ...:

In [4]: x = np.random.random(100000)

In [5]: N = 1000

In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid')
10 loops, best of 3: 172 ms per loop

In [7]: %timeit running_mean(x, N)
100 loops, best of 3: 6.72 ms per loop

In [8]: %timeit pd.rolling_mean(x, N)[N-1:]
100 loops, best of 3: 4.74 ms per loop

In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))
Out[9]: True

También hay buenas opciones sobre cómo lidiar con los valores de borde.

jasaarim
fuente
66
Pandas rolling_mean es una buena herramienta para el trabajo, pero ha quedado en desuso para ndarrays. En futuros lanzamientos de Pandas solo funcionará en la serie Pandas. ¿A dónde recurrimos ahora para los datos de matriz que no son Pandas?
Mike
55
@Mike rolling_mean () está en desuso, pero ahora puede usar rolling y mean por separado: df.rolling(windowsize).mean()ahora funciona en su lugar (muy rápidamente podría agregar). para 6,000 series de filas %timeit test1.rolling(20).mean()devolvió 1000 bucles, lo mejor de 3: 1.16 ms por bucle
Vlox
55
@Vlox df.rolling()funciona lo suficientemente bien, el problema es que incluso este formulario no admitirá ndarrays en el futuro. Para usarlo, primero tendremos que cargar nuestros datos en un marco de datos de Pandas. Me encantaría ver esta función agregada a cualquiera numpyo scipy.signal.
Mike
1
@ Mike está totalmente de acuerdo. Estoy luchando en particular para hacer coincidir la velocidad de los pandas .ewm (). Mean () para mis propias matrices (en lugar de tener que cargarlas primero en un df). Quiero decir, es genial que sea rápido, pero se siente un poco torpe al entrar y salir de los marcos de datos con demasiada frecuencia.
Vlox
66
%timeit bottleneck.move_mean(x, N)es de 3 a 15 veces más rápido que los métodos cumsum y pandas en mi pc. Echa un vistazo a su punto de referencia en el archivo README del repositorio .
MAB
50

Puede calcular una media de ejecución con:

import numpy as np

def runningMean(x, N):
    y = np.zeros((len(x),))
    for ctr in range(len(x)):
         y[ctr] = np.sum(x[ctr:(ctr+N)])
    return y/N

Pero es lento.

Afortunadamente, numpy incluye una función de convolución que podemos usar para acelerar las cosas. La media de ejecución es equivalente a convolucionarse xcon un vector que es Nlargo, con todos los miembros iguales a 1/N. La implementación numpy de convolve incluye el transitorio inicial, por lo que debe eliminar los primeros puntos N-1:

def runningMeanFast(x, N):
    return np.convolve(x, np.ones((N,))/N)[(N-1):]

En mi máquina, la versión rápida es 20-30 veces más rápida, dependiendo de la longitud del vector de entrada y el tamaño de la ventana de promedio.

Tenga en cuenta que convolve incluye un 'same'modo que parece que debería abordar el problema transitorio inicial, pero lo divide entre el principio y el final.

mtrw
fuente
Tenga en cuenta que eliminar los primeros puntos N-1 todavía deja un efecto de límite en los últimos puntos. Una forma más fácil de resolver el problema es usarlo mode='valid'en un proceso convolveque no requiera ningún procesamiento posterior.
lapis
1
@Psycho: mode='valid'elimina el transitorio de ambos extremos, ¿verdad? Si len(x)=10y N=4, para una media en ejecución, quisiera 10 resultados pero validdevuelve 7.
mtrw
1
Elimina el transitorio del final, y el principio no tiene uno. Bueno, supongo que es una cuestión de prioridades, no necesito el mismo número de resultados a expensas de obtener una pendiente hacia cero que no existe en los datos. Por cierto, aquí hay un comando para mostrar la diferencia entre los modos: modes = ('full', 'same', 'valid'); [plot(convolve(ones((200,)), ones((50,))/50, mode=m)) for m in modes]; axis([-10, 251, -.1, 1.1]); legend(modes, loc='lower center')(con pyplot y numpy importado).
lapis
runningMeanTengo el efecto secundario de promediar con ceros, cuando sales de la matriz con x[ctr:(ctr+N)]el lado derecho de la matriz.
mrgloom
runningMeanFastTambién tiene este problema de efecto de borde.
mrgloom
22

o módulo para python que calcula

en mis pruebas en Tradewave.net TA-lib siempre gana:

import talib as ta
import numpy as np
import pandas as pd
import scipy
from scipy import signal
import time as t

PAIR = info.primary_pair
PERIOD = 30

def initialize():
    storage.reset()
    storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0])

def cumsum_sma(array, period):
    ret = np.cumsum(array, dtype=float)
    ret[period:] = ret[period:] - ret[:-period]
    return ret[period - 1:] / period

def pandas_sma(array, period):
    return pd.rolling_mean(array, period)

def api_sma(array, period):
    # this method is native to Tradewave and does NOT return an array
    return (data[PAIR].ma(PERIOD))

def talib_sma(array, period):
    return ta.MA(array, period)

def convolve_sma(array, period):
    return np.convolve(array, np.ones((period,))/period, mode='valid')

def fftconvolve_sma(array, period):    
    return scipy.signal.fftconvolve(
        array, np.ones((period,))/period, mode='valid')    

def tick():

    close = data[PAIR].warmup_period('close')

    t1 = t.time()
    sma_api = api_sma(close, PERIOD)
    t2 = t.time()
    sma_cumsum = cumsum_sma(close, PERIOD)
    t3 = t.time()
    sma_pandas = pandas_sma(close, PERIOD)
    t4 = t.time()
    sma_talib = talib_sma(close, PERIOD)
    t5 = t.time()
    sma_convolve = convolve_sma(close, PERIOD)
    t6 = t.time()
    sma_fftconvolve = fftconvolve_sma(close, PERIOD)
    t7 = t.time()

    storage.elapsed[-1] = storage.elapsed[-1] + t2-t1
    storage.elapsed[-2] = storage.elapsed[-2] + t3-t2
    storage.elapsed[-3] = storage.elapsed[-3] + t4-t3
    storage.elapsed[-4] = storage.elapsed[-4] + t5-t4
    storage.elapsed[-5] = storage.elapsed[-5] + t6-t5    
    storage.elapsed[-6] = storage.elapsed[-6] + t7-t6        

    plot('sma_api', sma_api)  
    plot('sma_cumsum', sma_cumsum[-5])
    plot('sma_pandas', sma_pandas[-10])
    plot('sma_talib', sma_talib[-15])
    plot('sma_convolve', sma_convolve[-20])    
    plot('sma_fftconvolve', sma_fftconvolve[-25])

def stop():

    log('ticks....: %s' % info.max_ticks)

    log('api......: %.5f' % storage.elapsed[-1])
    log('cumsum...: %.5f' % storage.elapsed[-2])
    log('pandas...: %.5f' % storage.elapsed[-3])
    log('talib....: %.5f' % storage.elapsed[-4])
    log('convolve.: %.5f' % storage.elapsed[-5])    
    log('fft......: %.5f' % storage.elapsed[-6])

resultados:

[2015-01-31 23:00:00] ticks....: 744
[2015-01-31 23:00:00] api......: 0.16445
[2015-01-31 23:00:00] cumsum...: 0.03189
[2015-01-31 23:00:00] pandas...: 0.03677
[2015-01-31 23:00:00] talib....: 0.00700  # <<< Winner!
[2015-01-31 23:00:00] convolve.: 0.04871
[2015-01-31 23:00:00] fft......: 0.22306

ingrese la descripción de la imagen aquí

presencia lite
fuente
NameError: name 'info' is not defined. Recibo este error, señor.
Md. Rezwanul Haque
1
Parece que las series de tiempo cambian después del suavizado, ¿es el efecto deseado?
mrgloom el
@mrgloom sí, para fines de visualización; de lo contrario, aparecerían como una línea en el gráfico; Md. Rezwanul Haque puede eliminar todas las referencias a PAIR e información; aquellos eran métodos de espacio aislado internos para la tradewave.net ahora difunto
litepresence
21

Para obtener una solución lista para usar, consulte https://scipy-cookbook.readthedocs.io/items/SignalSmooth.html . Proporciona promedio de ejecución con el flattipo de ventana. Tenga en cuenta que esto es un poco más sofisticado que el simple método de convolución “hágalo usted mismo”, ya que trata de manejar los problemas al principio y al final de los datos reflejándolos (lo que puede o no funcionar en su caso. ..).

Para empezar, puedes probar:

a = np.random.random(100)
plt.plot(a)
b = smooth(a, window='flat')
plt.plot(b)
Hansemann
fuente
1
Este método se basa en numpy.convolvela diferencia solo para alterar la secuencia.
Alleo
10
Siempre me molesta la función de procesamiento de señales que devuelve señales de salida de diferente forma que las señales de entrada cuando ambas entradas y salidas son de la misma naturaleza (por ejemplo, ambas señales temporales). Rompe la correspondencia con la variable independiente relacionada (p. Ej., Tiempo, frecuencia) haciendo que el trazado o la comparación no sea una cuestión directa ... de todos modos, si comparte el sentimiento, es posible que desee cambiar las últimas líneas de la función propuesta como y = np .convolve (w / w.sum (), s, mode = 'same'); return y [window_len-1 :-( window_len-1)]
Christian O'Reilly
@ ChristianO'Reilly, debe publicar eso como una respuesta separada, eso es exactamente lo que estaba buscando, ya que de hecho tengo otras dos matrices que tienen que coincidir con las longitudes de los datos suavizados, para trazar, etc. Me gustaría saber exactamente cómo lo hizo: ¿es wel tamaño de la ventana y slos datos?
Demis
@Demis Me alegra que el comentario haya ayudado. Más información sobre la función nuvoly convolve aquí docs.scipy.org/doc/numpy-1.15.0/reference/generated/… Una función de convolución ( en.wikipedia.org/wiki/Convolution ) involucra dos señales entre sí. En este caso, involucra tus señales con una ventana normalizada (es decir, área unitaria) (w / w.sum ()).
Christian O'Reilly
21

Puede usar scipy.ndimage.filters.uniform_filter1d :

import numpy as np
from scipy.ndimage.filters import uniform_filter1d
N = 1000
x = np.random.random(100000)
y = uniform_filter1d(x, size=N)

uniform_filter1d:

  • da la salida con la misma forma numpy (es decir, número de puntos)
  • permite múltiples formas de manejar el borde donde 'reflect'está el valor predeterminado, pero en mi caso, prefería'nearest'

También es bastante rápido (casi 50 veces más rápido que np.convolvey 2-5 veces más rápido que el enfoque cumsum dado anteriormente ):

%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same')
100 loops, best of 3: 9.28 ms per loop

%timeit y2 = uniform_filter1d(x, size=N)
10000 loops, best of 3: 191 µs per loop

Aquí hay 3 funciones que le permiten comparar el error / velocidad de diferentes implementaciones:

from __future__ import division
import numpy as np
import scipy.ndimage.filters as ndif
def running_mean_convolve(x, N):
    return np.convolve(x, np.ones(N) / float(N), 'valid')
def running_mean_cumsum(x, N):
    cumsum = np.cumsum(np.insert(x, 0, 0))
    return (cumsum[N:] - cumsum[:-N]) / float(N)
def running_mean_uniform_filter1d(x, N):
    return ndif.uniform_filter1d(x, N, mode='constant', origin=-(N//2))[:-(N-1)]
moi
fuente
1
Esta es la única respuesta que parece tener en cuenta los problemas fronterizos (bastante importante, especialmente al trazar). ¡Gracias!
Gabriel
1
Perfilé uniform_filter1d, np.convolvecon un rectángulo, y np.cumsumseguido de np.subtract. mis resultados: (1.) convolve es el más lento. (2.) cumsum / sustract es aproximadamente 20-30x más rápido. (3.) uniform_filter1d es aproximadamente 2-3 veces más rápido que cumsum / sustract. el ganador es definitivamente uniform_filter1d.
Trevor Boyd Smith
el uso uniform_filter1des más rápido que la cumsumsolución (en aproximadamente 2-5x). y uniform_filter1d no recibe un error masivo de coma flotante como lo hace lacumsum solución.
Trevor Boyd Smith
15

Sé que esta es una vieja pregunta, pero aquí hay una solución que no utiliza ninguna estructura de datos o bibliotecas adicionales. Es lineal en el número de elementos de la lista de entrada y no se me ocurre otra forma de hacerlo más eficiente (en realidad, si alguien sabe de una mejor manera de asignar el resultado, hágamelo saber).

NOTA: esto sería mucho más rápido usando una matriz numpy en lugar de una lista, pero quería eliminar todas las dependencias. También sería posible mejorar el rendimiento mediante la ejecución de subprocesos múltiples

La función supone que la lista de entrada es unidimensional, así que tenga cuidado.

### Running mean/Moving average
def running_mean(l, N):
    sum = 0
    result = list( 0 for x in l)

    for i in range( 0, N ):
        sum = sum + l[i]
        result[i] = sum / (i+1)

    for i in range( N, len(l) ):
        sum = sum - l[i-N] + l[i]
        result[i] = sum / N

    return result

Ejemplo

Suponga que tenemos una lista data = [ 1, 2, 3, 4, 5, 6 ]en la que queremos calcular una media móvil con un período de 3, y que también desea una lista de salida que tenga el mismo tamaño que la entrada (es el caso más frecuente).

El primer elemento tiene el índice 0, por lo que la media móvil debe calcularse en los elementos del índice -2, -1 y 0. Obviamente no tenemos datos [-2] y datos [-1] (a menos que desee utilizar un valor especial condiciones de contorno), por lo que suponemos que esos elementos son 0. Esto es equivalente a rellenar con cero la lista, excepto que en realidad no la rellenamos, solo hacemos un seguimiento de los índices que requieren relleno (de 0 a N-1).

Entonces, para los primeros N elementos, seguimos sumando los elementos en un acumulador.

result[0] = (0 + 0 + 1) / 3  = 0.333    ==   (sum + 1) / 3
result[1] = (0 + 1 + 2) / 3  = 1        ==   (sum + 2) / 3
result[2] = (1 + 2 + 3) / 3  = 2        ==   (sum + 3) / 3

A partir de los elementos N + 1 en adelante, la acumulación simple no funciona. esperamos result[3] = (2 + 3 + 4)/3 = 3pero esto es diferente de (sum + 4)/3 = 3.333.

La forma de calcular el valor correcto es restar data[0] = 1de sum+4, dando así sum + 4 - 1 = 9.

Esto sucede porque actualmente sum = data[0] + data[1] + data[2], pero también es cierto para todos i >= Nporque, antes de la resta, sumes data[i-N] + ... + data[i-2] + data[i-1].

Nexo
fuente
12

Siento que esto se puede resolver elegantemente usando un cuello de botella

Ver muestra básica a continuación:

import numpy as np
import bottleneck as bn

a = np.random.randint(4, 1000, size=100)
mm = bn.move_mean(a, window=5, min_count=1)
  • "mm" es la media móvil de "a".

  • "ventana" es el número máximo de entradas a considerar para la media móvil.

  • "min_count" es el número mínimo de entradas a considerar para la media móvil (por ejemplo, para los primeros elementos o si la matriz tiene valores nan).

Lo bueno es que Bottleneck ayuda a lidiar con los valores nanométricos y también es muy eficiente.

Anthony Anyanwu
fuente
Esta lib es realmente rápida. La función de media móvil pura de Python es lenta. Bootleneck es una biblioteca PyData, que creo que es estable y puede obtener soporte continuo de la comunidad Python, entonces, ¿por qué no usarla?
GoingMyWay
6

Todavía no he comprobado qué tan rápido es esto, pero podrías intentarlo:

from collections import deque

cache = deque() # keep track of seen values
n = 10          # window size
A = xrange(100) # some dummy iterable
cum_sum = 0     # initialize cumulative sum

for t, val in enumerate(A, 1):
    cache.append(val)
    cum_sum += val
    if t < n:
        avg = cum_sum / float(t)
    else:                           # if window is saturated,
        cum_sum -= cache.popleft()  # subtract oldest value
        avg = cum_sum / float(n)
Kris
fuente
1
Esto es lo que iba a hacer. ¿Alguien puede criticar por qué esta es una mala manera de hacerlo?
staggart
1
Esta solución simple de Python funcionó bien para mí sin requerir numpy. Terminé convirtiéndolo en una clase para su reutilización.
Matthew Tschiegg
6

Esta respuesta contiene soluciones que utilizan la biblioteca estándar de Python para tres escenarios diferentes.


Promedio corriente con itertools.accumulate

Esta es una solución Python 3.2+ de memoria eficiente que calcula el promedio de ejecución sobre un valor iterable de apalancamiento itertools.accumulate.

>>> from itertools import accumulate
>>> values = range(100)

Tenga en cuenta que valuespuede ser iterable, incluidos generadores o cualquier otro objeto que produzca valores sobre la marcha.

Primero, construye perezosamente la suma acumulativa de los valores.

>>> cumu_sum = accumulate(value_stream)

A continuación, enumeratela suma acumulativa (a partir de 1) y construir un generador que produce la fracción de valores acumulados y el índice de enumeración actual.

>>> rolling_avg = (accu/i for i, accu in enumerate(cumu_sum, 1))

Puede emitir means = list(rolling_avg)si necesita todos los valores en la memoria a la vez o llamar de forma nextincremental.
(Por supuesto, también puede iterar rolling_avgcon un forbucle, que llamará nextimplícitamente).

>>> next(rolling_avg) # 0/1
>>> 0.0
>>> next(rolling_avg) # (0 + 1)/2
>>> 0.5
>>> next(rolling_avg) # (0 + 1 + 2)/3
>>> 1.0

Esta solución se puede escribir como una función de la siguiente manera.

from itertools import accumulate

def rolling_avg(iterable):
    cumu_sum = accumulate(iterable)
    yield from (accu/i for i, accu in enumerate(cumu_sum, 1))
    

Una rutina a la que puede enviar valores en cualquier momento

Esta rutina consume los valores que envía y mantiene un promedio de los valores vistos hasta ahora.

Es útil cuando no tiene un valor iterable pero adquiere los valores para promediar uno por uno en diferentes momentos a lo largo de la vida de su programa.

def rolling_avg_coro():
    i = 0
    total = 0.0
    avg = None

    while True:
        next_value = yield avg
        i += 1
        total += next_value
        avg = total/i
        

La corutina funciona así:

>>> averager = rolling_avg_coro() # instantiate coroutine
>>> next(averager) # get coroutine going (this is called priming)
>>>
>>> averager.send(5) # 5/1
>>> 5.0
>>> averager.send(3) # (5 + 3)/2
>>> 4.0
>>> print('doing something else...')
doing something else...
>>> averager.send(13) # (5 + 3 + 13)/3
>>> 7.0

Calcular el promedio sobre una ventana deslizante de tamaño N

Esta función de generador toma un tamaño iterable y de ventana N y produce el promedio sobre los valores actuales dentro de la ventana. Utiliza a deque, que es una estructura de datos similar a una lista, pero optimizada para modificaciones rápidas ( pop, append) en ambos puntos finales .

from collections import deque
from itertools import islice

def sliding_avg(iterable, N):        
    it = iter(iterable)
    window = deque(islice(it, N))        
    num_vals = len(window)

    if num_vals < N:
        msg = 'window size {} exceeds total number of values {}'
        raise ValueError(msg.format(N, num_vals))

    N = float(N) # force floating point division if using Python 2
    s = sum(window)
    
    while True:
        yield s/N
        try:
            nxt = next(it)
        except StopIteration:
            break
        s = s - window.popleft() + nxt
        window.append(nxt)
        

Aquí está la función en acción:

>>> values = range(100)
>>> N = 5
>>> window_avg = sliding_avg(values, N)
>>> 
>>> next(window_avg) # (0 + 1 + 2 + 3 + 4)/5
>>> 2.0
>>> next(window_avg) # (1 + 2 + 3 + 4 + 5)/5
>>> 3.0
>>> next(window_avg) # (2 + 3 + 4 + 5 + 6)/5
>>> 4.0
timgeb
fuente
5

Llegué un poco tarde a la fiesta, pero hice mi propia pequeña función que NO envuelve los extremos o los pads con ceros que luego se usan para encontrar el promedio también. Como otro tratamiento es que también vuelve a muestrear la señal en puntos linealmente espaciados. Personalice el código a voluntad para obtener otras funciones.

El método es una simple multiplicación matricial con un núcleo gaussiano normalizado.

def running_mean(y_in, x_in, N_out=101, sigma=1):
    '''
    Returns running mean as a Bell-curve weighted average at evenly spaced
    points. Does NOT wrap signal around, or pad with zeros.

    Arguments:
    y_in -- y values, the values to be smoothed and re-sampled
    x_in -- x values for array

    Keyword arguments:
    N_out -- NoOf elements in resampled array.
    sigma -- 'Width' of Bell-curve in units of param x .
    '''
    N_in = size(y_in)

    # Gaussian kernel
    x_out = np.linspace(np.min(x_in), np.max(x_in), N_out)
    x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out)
    gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2))
    # Normalize kernel, such that the sum is one along axis 1
    normalization = np.tile(np.reshape(sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in))
    gauss_kernel_normalized = gauss_kernel / normalization
    # Perform running average as a linear operation
    y_out = gauss_kernel_normalized @ y_in

    return y_out, x_out

Un uso simple en una señal sinusoidal con ruido distribuido normal agregado: ingrese la descripción de la imagen aquí

Clausen
fuente
Esto no funciona para mí (python 3.6). 1 No hay una función nombrada sum, en su np.sumlugar se usa 2 El @operador (no tengo idea de qué se trata) arroja un error. Puede que lo investigue más tarde, pero ahora me falta el tiempo
Bastian, el
El @es el operador de multiplicación de matrices que implementa np.matmul . Compruebe si su y_inmatriz es una matriz numpy, ese podría ser el problema.
xyzzyqed
5

En lugar de numpy o scipy, recomendaría pandas para hacer esto más rápidamente:

df['data'].rolling(3).mean()

Esto toma el promedio móvil (MA) de 3 períodos de la columna "datos". También puede calcular las versiones desplazadas, por ejemplo, la que excluye la celda actual (desplazada hacia atrás) se puede calcular fácilmente como:

df['data'].shift(periods=1).rolling(3).mean()
Gursel Karacor
fuente
¿Cómo es esto diferente de la solución propuesta en 2016 ?
Sr. T
2
La solución propuesta en 2016 usa pandas.rolling_meanmientras que la mía usa pandas.DataFrame.rolling. También puede calcular el movimiento, min(), max(), sum()etc., así como mean()con este método fácilmente.
Gursel Karacor
En el primero necesitas usar un método diferente como pandas.rolling_min, pandas.rolling_maxetc. Son similares pero diferentes.
Gursel Karacor
4

Hay un comentario de mab enterrado en una de las respuestas anteriores que tiene este método. bottlenecktiene move_meancuál es un promedio móvil simple:

import numpy as np
import bottleneck as bn

a = np.arange(10) + np.random.random(10)

mva = bn.move_mean(a, window=2, min_count=1)

min_countes un parámetro útil que básicamente llevará la media móvil hasta ese punto en su matriz. Si no establece min_count, será igual window, y todo hasta los windowpuntos será nan.

palabras por el contrario
fuente
3

Otro enfoque para encontrar el promedio móvil sin usar panda numpy

import itertools
sample = [2, 6, 10, 8, 11, 10]
list(itertools.starmap(lambda a,b: b/a, 
               enumerate(itertools.accumulate(sample), 1)))

imprimirá [2.0, 4.0, 6.0, 6.5, 7.4, 7.833333333333333]

DmitrySemenov
fuente
itertools.accumulate no existe en python 2.7, pero sí en python 3.4
grayaii
3

Esta pregunta ahora es incluso más antigua que cuando NeXuS escribió sobre ella el mes pasado, PERO me gusta cómo su código trata los casos extremos. Sin embargo, debido a que es un "promedio móvil simple", sus resultados van a la zaga de los datos a los que se aplican. Pensé que se trata de casos extremos de un modo más satisfactorio que los modos de NumPy valid, samey fullque podría lograrse mediante la aplicación de un enfoque similar al de un convolution()método basado.

Mi contribución utiliza un promedio de ejecución central para alinear sus resultados con sus datos. Cuando hay muy pocos puntos disponibles para usar la ventana de tamaño completo, los promedios de ejecución se calculan desde ventanas sucesivamente más pequeñas en los bordes de la matriz. [En realidad, desde ventanas sucesivamente más grandes, pero ese es un detalle de implementación.]

import numpy as np

def running_mean(l, N):
    # Also works for the(strictly invalid) cases when N is even.
    if (N//2)*2 == N:
        N = N - 1
    front = np.zeros(N//2)
    back = np.zeros(N//2)

    for i in range(1, (N//2)*2, 2):
        front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
    for i in range(1, (N//2)*2, 2):
        back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
    return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])

Es relativamente lento porque usa convolve(), y probablemente podría ser mejorado por un verdadero Pythonista, sin embargo, creo que la idea sigue en pie.

marisano
fuente
3

Hay muchas respuestas anteriores sobre el cálculo de una media continua. Mi respuesta agrega dos características adicionales:

  1. ignora los valores nan
  2. calcula la media de los N valores vecinos NO incluye el valor de interés en sí

Esta segunda característica es particularmente útil para determinar qué valores difieren de la tendencia general en una cierta cantidad.

Yo uso numpy.cumsum ya que es el método más eficiente en el tiempo ( ver la respuesta de Alleo arriba ).

N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x) 
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)

Este código funciona incluso para Ns solamente. Se puede ajustar para números impares cambiando la inserción np.de padded_x y n_nan.

Ejemplo de salida (sin formato en negro, movavg en azul): datos sin procesar (negro) y promedio móvil (azul) de 10 puntos alrededor de cada valor, sin incluir ese valor.  los valores nan se ignoran.

Este código se puede adaptar fácilmente para eliminar todos los valores promedio móviles calculados a partir de un valor menor que el valor de corte = 3 valores distintos de nan.

window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)

datos sin procesar (negro) y promedio móvil (azul) mientras se ignora cualquier ventana con menos de 3 valores distintos de nan

gtcoder
fuente
2

Utilice solo la biblioteca estándar de Python (memoria eficiente)

Simplemente dé otra versión del uso de la biblioteca estándar dequesolamente. Es una gran sorpresa para mí que la mayoría de las respuestas estén usando pandaso numpy.

def moving_average(iterable, n=3):
    d = deque(maxlen=n)
    for i in iterable:
        d.append(i)
        if len(d) == n:
            yield sum(d)/n

r = moving_average([40, 30, 50, 46, 39, 44])
assert list(r) == [40.0, 42.0, 45.0, 43.0]

En realidad encontré otra implementación en documentos de Python

def moving_average(iterable, n=3):
    # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable)
    d = deque(itertools.islice(it, n-1))
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

Sin embargo, la implementación me parece un poco más compleja de lo que debería ser. Pero debe estar en los documentos estándar de Python por una razón, ¿alguien podría comentar sobre la implementación del mío y el documento estándar?

MaThMaX
fuente
2
Una gran diferencia es que sigue sumando los miembros de la ventana en cada iteración, y ellos actualizan eficientemente la suma (elimine un miembro y agregue otro). en términos de complejidad, está haciendo O(n*d) cálculos ( dsiendo el tamaño de la ventana, el ntamaño de iterable) y lo están haciendoO(n)
Iftah
@ Iftah, bien, gracias por la explicación, tienes razón.
MaThMaX
2

Con las variables de @ Aikude, escribí one-liner.

import numpy as np

mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3

mean = [np.mean(mylist[x:x+N]) for x in range(len(mylist)-N+1)]
print(mean)

>>> [2.0, 3.0, 4.0, 5.0, 6.0]
greentec
fuente
1

Aunque hay soluciones para esta pregunta aquí, eche un vistazo a mi solución. Es muy simple y funciona bien.

import numpy as np
dataset = np.asarray([1, 2, 3, 4, 5, 6, 7])
ma = list()
window = 3
for t in range(0, len(dataset)):
    if t+window <= len(dataset):
        indices = range(t, t+window)
        ma.append(np.average(np.take(dataset, indices)))
else:
    ma = np.asarray(ma)
Ayberk Yavuz
fuente
1

Al leer las otras respuestas, no creo que esto sea lo que pedía la pregunta, pero llegué aquí con la necesidad de mantener un promedio de una lista de valores que crecía en tamaño.

Entonces, si desea mantener una lista de valores que está adquiriendo desde algún lugar (un sitio, un dispositivo de medición, etc.) y el promedio de los últimos nvalores actualizados, puede usar el siguiente código, que minimiza el esfuerzo de agregar nuevos elementos:

class Running_Average(object):
    def __init__(self, buffer_size=10):
        """
        Create a new Running_Average object.

        This object allows the efficient calculation of the average of the last
        `buffer_size` numbers added to it.

        Examples
        --------
        >>> a = Running_Average(2)
        >>> a.add(1)
        >>> a.get()
        1.0
        >>> a.add(1)  # there are two 1 in buffer
        >>> a.get()
        1.0
        >>> a.add(2)  # there's a 1 and a 2 in the buffer
        >>> a.get()
        1.5
        >>> a.add(2)
        >>> a.get()  # now there's only two 2 in the buffer
        2.0
        """
        self._buffer_size = int(buffer_size)  # make sure it's an int
        self.reset()

    def add(self, new):
        """
        Add a new number to the buffer, or replaces the oldest one there.
        """
        new = float(new)  # make sure it's a float
        n = len(self._buffer)
        if n < self.buffer_size:  # still have to had numbers to the buffer.
            self._buffer.append(new)
            if self._average != self._average:  # ~ if isNaN().
                self._average = new  # no previous numbers, so it's new.
            else:
                self._average *= n  # so it's only the sum of numbers.
                self._average += new  # add new number.
                self._average /= (n+1)  # divide by new number of numbers.
        else:  # buffer full, replace oldest value.
            old = self._buffer[self._index]  # the previous oldest number.
            self._buffer[self._index] = new  # replace with new one.
            self._index += 1  # update the index and make sure it's...
            self._index %= self.buffer_size  # ... smaller than buffer_size.
            self._average -= old/self.buffer_size  # remove old one...
            self._average += new/self.buffer_size  # ...and add new one...
            # ... weighted by the number of elements.

    def __call__(self):
        """
        Return the moving average value, for the lazy ones who don't want
        to write .get .
        """
        return self._average

    def get(self):
        """
        Return the moving average value.
        """
        return self()

    def reset(self):
        """
        Reset the moving average.

        If for some reason you don't want to just create a new one.
        """
        self._buffer = []  # could use np.empty(self.buffer_size)...
        self._index = 0  # and use this to keep track of how many numbers.
        self._average = float('nan')  # could use np.NaN .

    def get_buffer_size(self):
        """
        Return current buffer_size.
        """
        return self._buffer_size

    def set_buffer_size(self, buffer_size):
        """
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]

        Decreasing buffer size:
        >>> a.buffer_size = 6
        >>> a._buffer  # should not access this!!
        [9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        >>> a.buffer_size = 2
        >>> a._buffer
        [13.0, 14.0]

        Increasing buffer size:
        >>> a.buffer_size = 5
        Warning: no older data available!
        >>> a._buffer
        [13.0, 14.0]

        Keeping buffer size:
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
        >>> a.buffer_size = 10  # reorders buffer!
        >>> a._buffer
        [5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        """
        buffer_size = int(buffer_size)
        # order the buffer so index is zero again:
        new_buffer = self._buffer[self._index:]
        new_buffer.extend(self._buffer[:self._index])
        self._index = 0
        if self._buffer_size < buffer_size:
            print('Warning: no older data available!')  # should use Warnings!
        else:
            diff = self._buffer_size - buffer_size
            print(diff)
            new_buffer = new_buffer[diff:]
        self._buffer_size = buffer_size
        self._buffer = new_buffer

    buffer_size = property(get_buffer_size, set_buffer_size)

Y puedes probarlo con, por ejemplo:

def graph_test(N=200):
    import matplotlib.pyplot as plt
    values = list(range(N))
    values_average_calculator = Running_Average(N/2)
    values_averages = []
    for value in values:
        values_average_calculator.add(value)
        values_averages.append(values_average_calculator())
    fig, ax = plt.subplots(1, 1)
    ax.plot(values, label='values')
    ax.plot(values_averages, label='averages')
    ax.grid()
    ax.set_xlim(0, N)
    ax.set_ylim(0, N)
    fig.show()

Lo que da:

Valores y su promedio en función de los valores #

berna1111
fuente
1

Otra solución simplemente usando una biblioteca estándar y deque:

from collections import deque
import itertools

def moving_average(iterable, n=3):
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable) 
    # create an iterable object from input argument
    d = deque(itertools.islice(it, n-1))  
    # create deque object by slicing iterable
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

# example on how to use it
for i in  moving_average([40, 30, 50, 46, 39, 44]):
    print(i)

# 40.0
# 42.0
# 45.0
# 43.0
Vlad Bezden
fuente
1

Para fines educativos, permítanme agregar dos soluciones Numpy más (que son más lentas que la solución cumsum):

import numpy as np
from numpy.lib.stride_tricks import as_strided

def ra_strides(arr, window):
    ''' Running average using as_strided'''
    n = arr.shape[0] - window + 1
    arr_strided = as_strided(arr, shape=[n, window], strides=2*arr.strides)
    return arr_strided.mean(axis=1)

def ra_add(arr, window):
    ''' Running average using add.reduceat'''
    n = arr.shape[0] - window + 1
    indices = np.array([0, window]*n) + np.repeat(np.arange(n), 2)
    arr = np.append(arr, 0)
    return np.add.reduceat(arr, indices )[::2]/window

Funciones utilizadas: as_strided , add.reduceat

Andreas K.
fuente
1

Todas las soluciones mencionadas son pobres porque carecen de

  • velocidad debido a una pitón nativa en lugar de una implementación vectorizada numpy,
  • estabilidad numérica debido al mal uso de numpy.cumsum, o
  • velocidad debido a O(len(x) * w)implementaciones como convoluciones.

Dado

import numpy
m = 10000
x = numpy.random.rand(m)
w = 1000

Tenga en cuenta que x_[:w].sum()es igual x[:w-1].sum(). Así, por primera media la numpy.cumsum(...)suma x[w] / w(a través x_[w+1] / w), y resta 0(de x_[0] / w). Esto resulta enx[0:w].mean()

A través de cumsum, actualizará el segundo promedio agregando x[w+1] / wy restando adicionalmente x[0] / w, lo que da como resultado x[1:w+1].mean().

Esto continúa hasta que x[-w:].mean()se alcanza.

x_ = numpy.insert(x, 0, 0)
sliding_average = x_[:w].sum() / w + numpy.cumsum(x_[w:] - x_[:-w]) / w

Esta solución es vectorizada O(m), legible y numéricamente estable.

Herbert
fuente
1

¿Qué tal un filtro de media móvil ? También es un trazador de líneas y tiene la ventaja de que puede manipular fácilmente el tipo de ventana si necesita algo más que el rectángulo, es decir. un promedio móvil simple de N de una matriz a:

lfilter(np.ones(N)/N, [1], a)[N:]

Y con la ventana triangular aplicada:

lfilter(np.ones(N)*scipy.signal.triang(N)/N, [1], a)[N:]

Nota: Por lo general, descarto las primeras N muestras como falsas, por lo tanto, [N:]al final, pero no es necesario y solo es una elección personal.

mac13k
fuente
-7

Si elige rodar la suya, en lugar de usar una biblioteca existente, tenga en cuenta el error de coma flotante y trate de minimizar sus efectos:

class SumAccumulator:
    def __init__(self):
        self.values = [0]
        self.count = 0

    def add( self, val ):
        self.values.append( val )
        self.count = self.count + 1
        i = self.count
        while i & 0x01:
            i = i >> 1
            v0 = self.values.pop()
            v1 = self.values.pop()
            self.values.append( v0 + v1 )

    def get_total(self):
        return sum( reversed(self.values) )

    def get_size( self ):
        return self.count

Si todos sus valores son aproximadamente del mismo orden de magnitud, esto ayudará a preservar la precisión al agregar siempre valores de magnitudes más o menos similares.

Mayur Patel
fuente
15
Esta es una respuesta terriblemente confusa, al menos algún comentario en el código o explicación de por qué esto ayuda a un error de coma flotante sería bueno.
Gabe
En mi última oración estaba tratando de indicar por qué ayuda el error de coma flotante. Si dos valores son aproximadamente del mismo orden de magnitud, sumarlos pierde menos precisión que si agrega un número muy grande a uno muy pequeño. El código combina valores "adyacentes" de manera que incluso las sumas intermedias siempre deben ser razonablemente cercanas en magnitud, para minimizar el error de coma flotante. Nada es infalible, pero este método ha ahorrado un par de proyectos muy mal implementados en producción.
Mayur Patel
1. aplicado al problema original, esto sería terriblemente lento (promedio informático), por lo que esto es irrelevante 2. para sufrir el problema de precisión de los números de 64 bits, uno tiene que sumar >> 2 ^ 30 de casi números iguales
Alleo
@Alleo: en lugar de hacer una adición por valor, harás dos. La prueba es la misma que el problema de cambio de bits. Sin embargo, el punto de esta respuesta no es necesariamente el rendimiento, sino la precisión. El uso de memoria para promediar valores de 64 bits no excedería los 64 elementos en la memoria caché, por lo que también es amigable en el uso de memoria.
Mayur Patel
Sí, tiene razón en que esto requiere 2 veces más operaciones que la suma simple, pero el problema original es el cálculo de la media , no solo la suma. Lo que se puede hacer en O (n), pero su respuesta requiere O (mn), donde m es el tamaño de la ventana.
Alleo