이동 평균 또는 평균
특정 창에서 주어진 1D 배열의 평균을 계산하는 파이썬 용 scipy 함수 또는 numpy 함수 또는 모듈이 있습니까?
종속성없이 하나의 루프에서 모든 것을 수행하는 짧고 빠른 솔루션의 경우 아래 코드가 훌륭하게 작동합니다.
mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
cumsum, moving_aves = [0], []
for i, x in enumerate(mylist, 1):
cumsum.append(cumsum[i-1] + x)
if i>=N:
moving_ave = (cumsum[i] - cumsum[i-N])/N
#can do stuff with moving_ave here
moving_aves.append(moving_ave)
UPD : Alleo 와 jasaarim 이보다 효율적인 솔루션을 제안했습니다 .
당신은 np.convolve
그것을 위해 사용할 수 있습니다 :
np.convolve(x, np.ones((N,))/N, mode='valid')
설명
연속 평균은 컨볼 루션 의 수학 연산의 경우입니다 . 연속 평균의 경우 입력을 따라 창을 밀고 창 내용의 평균을 계산합니다. 불연속 1D 신호의 경우, 임의의 선형 조합을 계산하는 평균 대신 각 요소에 해당 계수를 곱하고 결과를 더하는 것을 제외하고 컨볼 루션은 동일합니다. 창의 각 위치마다 하나씩 해당 계수를 컨볼 루션 커널 이라고합니다 . 이제 N 값의 산술 평균은입니다 (x_1 + x_2 + ... + x_N) / N
. 따라서 해당 커널은 입니다. (1/N, 1/N, ..., 1/N)
이것이 바로 우리가 사용하는 것 np.ones((N,))/N
입니다.
가장자리
mode
의 인수 np.convolve
를 지정하는 방법 가장자리를 처리합니다. 나는 valid
대부분의 사람들이 달리기 평균이 작동하는 것을 기대한다고 생각하기 때문에 여기 에서 모드를 선택 했지만 다른 우선 순위가있을 수 있습니다. 다음은 모드 간의 차이점을 보여주는 그림입니다.
import numpy as np
import matplotlib.pyplot as plt
modes = ['full', 'same', 'valid']
for m in modes:
plt.plot(np.convolve(np.ones((200,)), np.ones((50,))/50, mode=m));
plt.axis([-10, 251, -.1, 1.1]);
plt.legend(modes, loc='lower center');
plt.show()
효율적인 솔루션
컨볼 루션은 간단한 접근법보다 훨씬 낫지 만 FFT를 사용하므로 상당히 느립니다. 그러나 달리기 계산을 위해 특별히 다음과 같은 접근 방식이 잘 작동합니다.
def running_mean(x, N):
cumsum = numpy.cumsum(numpy.insert(x, 0, 0))
return (cumsum[N:] - cumsum[:-N]) / float(N)
확인할 코드
In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop
참고 numpy.allclose(result1, result2)
되어 True
, 두 가지 방법은 동일하다. N이 클수록 시간의 차이가 큽니다.
업데이트 : 아래 예 pandas.rolling_mean
는 최신 버전의 팬더에서 제거 된 이전 기능을 보여줍니다 . 아래의 함수 호출과 같은 현대식은
In [8]: pd.Series(x).rolling(window=N).mean().iloc[N-1:].values
Out[8]:
array([ 0.49815397, 0.49844183, 0.49840518, ..., 0.49488191,
0.49456679, 0.49427121])
팬더 는 NumPy 또는 SciPy보다 더 적합합니다. rolling_mean 함수 는 작업을 편리하게 수행합니다. 또한 입력이 배열 인 경우 NumPy 배열을 반환합니다.
rolling_mean
사용자 정의 순수 Python 구현으로 성능 을 능가하는 것은 어렵습니다 . 제안 된 두 가지 솔루션에 대한 성능 예는 다음과 같습니다.
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: def running_mean(x, N):
...: cumsum = np.cumsum(np.insert(x, 0, 0))
...: return (cumsum[N:] - cumsum[:-N]) / N
...:
In [4]: x = np.random.random(100000)
In [5]: N = 1000
In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid')
10 loops, best of 3: 172 ms per loop
In [7]: %timeit running_mean(x, N)
100 loops, best of 3: 6.72 ms per loop
In [8]: %timeit pd.rolling_mean(x, N)[N-1:]
100 loops, best of 3: 4.74 ms per loop
In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))
Out[9]: True
가장자리 값을 처리하는 방법에 대한 멋진 옵션도 있습니다.
다음을 사용하여 누적 평균을 계산할 수 있습니다.
import numpy as np
def runningMean(x, N):
y = np.zeros((len(x),))
for ctr in range(len(x)):
y[ctr] = np.sum(x[ctr:(ctr+N)])
return y/N
그러나 느리다.
다행히 numpy에는 속도를 높이기 위해 사용할 수 있는 convolve 함수가 포함되어 있습니다. 연속 평균은 길이 x
가 N
긴 벡터를 사용 하여 모든 구성원 이 균등화 하는 것과 같습니다 1/N
. numpy convolve 구현에는 시작 과도가 포함되므로 첫 번째 N-1 점을 제거해야합니다.
def runningMeanFast(x, N):
return np.convolve(x, np.ones((N,))/N)[(N-1):]
내 컴퓨터에서 빠른 버전은 입력 벡터의 길이와 평균화 창의 크기에 따라 20-30 배 빠릅니다.
convolve에는 'same'
시작 일시적 문제를 해결해야하는 것처럼 보이는 모드 가 포함되어 있지만 시작과 끝으로 분할됩니다.
또는 파이썬을위한 모듈
Tradewave.net의 테스트에서 TA-lib는 항상 다음과 같이 승리합니다.
import talib as ta
import numpy as np
import pandas as pd
import scipy
from scipy import signal
import time as t
PAIR = info.primary_pair
PERIOD = 30
def initialize():
storage.reset()
storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0])
def cumsum_sma(array, period):
ret = np.cumsum(array, dtype=float)
ret[period:] = ret[period:] - ret[:-period]
return ret[period - 1:] / period
def pandas_sma(array, period):
return pd.rolling_mean(array, period)
def api_sma(array, period):
# this method is native to Tradewave and does NOT return an array
return (data[PAIR].ma(PERIOD))
def talib_sma(array, period):
return ta.MA(array, period)
def convolve_sma(array, period):
return np.convolve(array, np.ones((period,))/period, mode='valid')
def fftconvolve_sma(array, period):
return scipy.signal.fftconvolve(
array, np.ones((period,))/period, mode='valid')
def tick():
close = data[PAIR].warmup_period('close')
t1 = t.time()
sma_api = api_sma(close, PERIOD)
t2 = t.time()
sma_cumsum = cumsum_sma(close, PERIOD)
t3 = t.time()
sma_pandas = pandas_sma(close, PERIOD)
t4 = t.time()
sma_talib = talib_sma(close, PERIOD)
t5 = t.time()
sma_convolve = convolve_sma(close, PERIOD)
t6 = t.time()
sma_fftconvolve = fftconvolve_sma(close, PERIOD)
t7 = t.time()
storage.elapsed[-1] = storage.elapsed[-1] + t2-t1
storage.elapsed[-2] = storage.elapsed[-2] + t3-t2
storage.elapsed[-3] = storage.elapsed[-3] + t4-t3
storage.elapsed[-4] = storage.elapsed[-4] + t5-t4
storage.elapsed[-5] = storage.elapsed[-5] + t6-t5
storage.elapsed[-6] = storage.elapsed[-6] + t7-t6
plot('sma_api', sma_api)
plot('sma_cumsum', sma_cumsum[-5])
plot('sma_pandas', sma_pandas[-10])
plot('sma_talib', sma_talib[-15])
plot('sma_convolve', sma_convolve[-20])
plot('sma_fftconvolve', sma_fftconvolve[-25])
def stop():
log('ticks....: %s' % info.max_ticks)
log('api......: %.5f' % storage.elapsed[-1])
log('cumsum...: %.5f' % storage.elapsed[-2])
log('pandas...: %.5f' % storage.elapsed[-3])
log('talib....: %.5f' % storage.elapsed[-4])
log('convolve.: %.5f' % storage.elapsed[-5])
log('fft......: %.5f' % storage.elapsed[-6])
결과 :
[2015-01-31 23:00:00] ticks....: 744
[2015-01-31 23:00:00] api......: 0.16445
[2015-01-31 23:00:00] cumsum...: 0.03189
[2015-01-31 23:00:00] pandas...: 0.03677
[2015-01-31 23:00:00] talib....: 0.00700 # <<< Winner!
[2015-01-31 23:00:00] convolve.: 0.04871
[2015-01-31 23:00:00] fft......: 0.22306
즉시 사용 가능한 솔루션은 https://scipy-cookbook.readthedocs.io/items/SignalSmooth.html을 참조 하십시오 . flat
창 유형에 평균을 제공 합니다. 이것은 간단한 do-it-self-convolve-method보다 약간 더 정교합니다. 데이터를 처음부터 끝까지 반영하여 데이터를 반영하려고하기 때문에 (귀하의 경우에는 작동하지 않을 수도 있습니다.) ..).
우선 다음을 시도해보십시오.
a = np.random.random(100)
plt.plot(a)
b = smooth(a, window='flat')
plt.plot(b)
나는 이것이 오래된 질문이라는 것을 알고 있지만 여기에 여분의 데이터 구조 또는 라이브러리를 사용하지 않는 솔루션이 있습니다. 입력 목록의 요소 수는 선형이며 더 효율적으로 만드는 다른 방법은 생각할 수 없습니다 (실제로 누군가가 결과를 할당하는 더 좋은 방법을 알고 있다면 알려주십시오).
참고 : 이것은 목록 대신 numpy 배열을 사용하는 것이 훨씬 빠르지 만 모든 종속성을 제거하고 싶었습니다. 멀티 스레드 실행으로 성능을 향상시킬 수도 있습니다
이 함수는 입력 목록이 1 차원이라고 가정하므로주의하십시오.
### Running mean/Moving average
def running_mean(l, N):
sum = 0
result = list( 0 for x in l)
for i in range( 0, N ):
sum = sum + l[i]
result[i] = sum / (i+1)
for i in range( N, len(l) ):
sum = sum - l[i-N] + l[i]
result[i] = sum / N
return result
예
우리는 목록이 있다고 가정 data = [ 1, 2, 3, 4, 5, 6 ]
우리가 3의 기간 롤링 평균을 계산하고자하는, 그리고 당신은 또한 입력 한 같은 크기의 출력 목록을 원하는 (즉, 가장 빈번 경우).
첫 번째 요소에는 인덱스 0이 있으므로 롤링 평균은 인덱스 -2, -1 및 0의 요소에서 계산되어야합니다. 분명히 데이터 [-2] 및 데이터 [-1]이 없습니다 (특별하게 사용하지 않으려는 경우 제외) 경계 조건), 따라서 우리는 해당 요소가 0이라고 가정합니다. 이것은 실제로 패딩하지 않고 패딩이 필요한 인덱스 (0에서 N-1까지)를 추적하는 것을 제외하고는 목록을 0으로 채우는 것과 같습니다.
따라서 첫 번째 N 요소의 경우 누적기에 요소를 계속 추가합니다.
result[0] = (0 + 0 + 1) / 3 = 0.333 == (sum + 1) / 3
result[1] = (0 + 1 + 2) / 3 = 1 == (sum + 2) / 3
result[2] = (1 + 2 + 3) / 3 = 2 == (sum + 3) / 3
요소 N + 1부터 간단한 누적이 작동하지 않습니다. 우리는 기대 result[3] = (2 + 3 + 4)/3 = 3
하지만 이것은와 다릅니다 (sum + 4)/3 = 3.333
.
올바른 값을 계산하는 방법은 빼기하는 것입니다 data[0] = 1
로부터 sum+4
따라서 제공 sum + 4 - 1 = 9
.
이것은 현재 때문에 발생 sum = data[0] + data[1] + data[2]
하지만 i >= N
빼기 전에는이므로 모든 경우에도 마찬가지 sum
입니다 data[i-N] + ... + data[i-2] + data[i-1]
.
출력을 'valid'
컨볼 루션 영역으로 제한하는 대신 입력의 치수를 유지하는 것이 중요한 경우 scipy.ndimage.filters.uniform_filter1d 를 사용할 수 있습니다 .
import numpy as np
from scipy.ndimage.filters import uniform_filter1d
N = 1000
x = np.random.random(100000)
y = uniform_filter1d(x, size=N)
y.shape == x.shape
>>> True
uniform_filter1d
여러 가지 방법으로 테두리를 'reflect'
기본값 으로 처리 할 수 있지만 제 경우에는 오히려 원했습니다 'nearest'
.
또한 다소 빠릅니다 (보다 거의 50 배 빠름 np.convolve
).
%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same')
100 loops, best of 3: 9.28 ms per loop
%timeit y2 = uniform_filter1d(x, size=N)
10000 loops, best of 3: 191 µs per loop
나는 이것이 얼마나 빠른지 아직 확인하지 않았지만 시도해 볼 수 있습니다.
from collections import deque
cache = deque() # keep track of seen values
n = 10 # window size
A = xrange(100) # some dummy iterable
cum_sum = 0 # initialize cumulative sum
for t, val in enumerate(A, 1):
cache.append(val)
cum_sum += val
if t < n:
avg = cum_sum / float(t)
else: # if window is saturated,
cum_sum -= cache.popleft() # subtract oldest value
avg = cum_sum / float(n)
병목 현상을 사용하여 우아하게 해결할 수 있다고 생각합니다.
아래의 기본 샘플을 참조하십시오.
import numpy as np
import bottleneck as bn
a = np.random.randint(4, 1000, size=100)
mm = bn.move_mean(a, window=5, min_count=1)
"mm"은 "a"의 이동 평균입니다.
"창"은 이동 평균에 대해 고려해야 할 최대 항목 수입니다.
"min_count"는 이동 평균 (예 : 처음 몇 요소 또는 배열에 nan 값이있는 경우)으로 고려해야 할 최소 항목 수입니다.
좋은 점은 병목 현상이 난 값을 처리하는 데 도움이되며 매우 효율적이라는 것입니다.
파티에 조금 늦었지만 끝이나 패드 주위를 감싸지 않는 제 자신의 작은 기능을 만들어 평균을 찾는 데 사용했습니다. 추가 처리는 선형 간격의 지점에서 신호를 다시 샘플링하는 것입니다. 다른 기능을 사용하려면 마음대로 코드를 사용자 정의하십시오.
이 방법은 정규화 된 가우스 커널을 사용한 간단한 행렬 곱셈입니다.
def running_mean(y_in, x_in, N_out=101, sigma=1):
'''
Returns running mean as a Bell-curve weighted average at evenly spaced
points. Does NOT wrap signal around, or pad with zeros.
Arguments:
y_in -- y values, the values to be smoothed and re-sampled
x_in -- x values for array
Keyword arguments:
N_out -- NoOf elements in resampled array.
sigma -- 'Width' of Bell-curve in units of param x .
'''
N_in = size(y_in)
# Gaussian kernel
x_out = np.linspace(np.min(x_in), np.max(x_in), N_out)
x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out)
gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2))
# Normalize kernel, such that the sum is one along axis 1
normalization = np.tile(np.reshape(sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in))
gauss_kernel_normalized = gauss_kernel / normalization
# Perform running average as a linear operation
y_out = gauss_kernel_normalized @ y_in
return y_out, x_out
정규 분포 노이즈가 추가 된 정현파 신호에 대한 간단한 사용법 :
numpy 또는 scipy 대신 팬더가 더 신속 하게이 작업을 수행하도록 권장합니다.
df['data'].rolling(3).mean()
이것은 열 "데이터"의 3주기의 이동 평균 (MA)을 취합니다. 시프트 된 버전을 계산할 수도 있습니다. 예를 들어 현재 셀을 제외하고 (뒤로 시프트 한) 버전은 다음과 같이 쉽게 계산할 수 있습니다.
df['data'].shift(periods=1).rolling(3).mean()
numpy, panda 를 사용 하지 않고 이동 평균을 찾는 또 다른 방법
import itertools
sample = [2, 6, 10, 8, 11, 10]
list(itertools.starmap(lambda a,b: b/a,
enumerate(itertools.accumulate(sample), 1)))
인쇄합니다 [2.0, 4.0, 6.0, 6.5, 7.4, 7.833333333333333]
이 질문은 NeXuS가 지난 달에 작성했을 때보 다 훨씬 오래 되었지만, 그의 코드가 엣지 케이스를 처리하는 방식이 마음에 듭니다. 그러나 "단순 이동 평균"이기 때문에 결과가 적용되는 데이터보다 뒤떨어집니다. 나는 NumPy와의 모드보다 더 만족하게 가장자리 경우 다루는 생각 valid
, same
그리고 full
A와 유사한 접근 방식을 적용함으로써 달성 될 수있는 convolution()
기반 방법.
내 기여는 중앙 실행 평균을 사용하여 결과를 데이터와 일치시킵니다. 전체 크기 창을 사용할 수있는 포인트가 너무 적 으면 배열의 가장자리에있는 연속적으로 작은 창에서 실행 평균이 계산됩니다. [실제로 더 큰 창문에서 실제로는 구현 세부 사항입니다.]
import numpy as np
def running_mean(l, N):
# Also works for the(strictly invalid) cases when N is even.
if (N//2)*2 == N:
N = N - 1
front = np.zeros(N//2)
back = np.zeros(N//2)
for i in range(1, (N//2)*2, 2):
front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
for i in range(1, (N//2)*2, 2):
back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])
사용하기 때문에 상대적으로 느리며 convolve()
진정한 Pythonista에 의해 상당히 많이 퍼질 수 있지만 아이디어는 유효하다고 생각합니다.
이 답변에는 세 가지 시나리오에 Python 표준 라이브러리 를 사용하는 솔루션이 포함되어 있습니다.
와 평균을 실행 itertools.accumulate
이것은 메모리를 활용하여 반복 가능한 값에 대한 실행 평균을 계산하는 메모리 효율적인 Python 3.2+ 솔루션 itertools.accumulate
입니다.
>>> from itertools import accumulate
>>> values = range(100)
주 values
발전기 또는 즉시 값을 생성하는 다른 객체를 포함한 모든 반복 가능한 수있다.
먼저 값의 누적 합계를 느리게 구성하십시오.
>>> cumu_sum = accumulate(value_stream)
그런 다음 enumerate
누적 합계 (1부터 시작)를 누적 값의 비율과 현재 열거 인덱스를 생성하는 생성기를 구성합니다.
>>> rolling_avg = (accu/i for i, accu in enumerate(cumu_sum, 1))
means = list(rolling_avg)
메모리의 모든 값을 한 번에 필요하거나 next
증 분식으로 호출하면 문제가 발생할 수 있습니다 .
(물론, 당신은 또한 반복 할 수 rolling_avg
로 for
호출 루프, next
암시.)
>>> next(rolling_avg) # 0/1
>>> 0.0
>>> next(rolling_avg) # (0 + 1)/2
>>> 0.5
>>> next(rolling_avg) # (0 + 1 + 2)/3
>>> 1.0
이 솔루션은 다음과 같이 함수로 작성할 수 있습니다.
from itertools import accumulate
def rolling_avg(iterable):
cumu_sum = accumulate(iterable)
yield from (accu/i for i, accu in enumerate(cumu_sum, 1))
언제든지 값을 보낼 수 있는 코 루틴
이 코 루틴은 전송 한 값을 소비하고 지금까지 본 값의 평균을 유지합니다.
반복 가능한 값이 없지만 프로그램 수명 내내 다른 시간에 하나씩 평균값을 구할 때 유용합니다.
def rolling_avg_coro():
i = 0
total = 0.0
avg = None
while True:
next_value = yield avg
i += 1
total += next_value
avg = total/i
코 루틴은 다음과 같이 작동합니다.
>>> averager = rolling_avg_coro() # instantiate coroutine
>>> next(averager) # get coroutine going (this is called priming)
>>>
>>> averager.send(5) # 5/1
>>> 5.0
>>> averager.send(3) # (5 + 3)/2
>>> 4.0
>>> print('doing something else...')
doing something else...
>>> averager.send(13) # (5 + 3 + 13)/3
>>> 7.0
슬라이딩 윈도우 크기의 평균 계산 N
이 생성기 함수는 iterable 및 창 크기 N
를 사용하여 창 내부의 현재 값에 대한 평균을 산출합니다. 그것은 사용 deque
목록과 유사하지만, 고속 변형 (최적화 된 자료 구조 인 pop
, append
) 양자 모두가 끝점 .
from collections import deque
from itertools import islice
def sliding_avg(iterable, N):
it = iter(iterable)
window = deque(islice(it, N))
num_vals = len(window)
if num_vals < N:
msg = 'window size {} exceeds total number of values {}'
raise ValueError(msg.format(N, num_vals))
N = float(N) # force floating point division if using Python 2
s = sum(window)
while True:
yield s/N
try:
nxt = next(it)
except StopIteration:
break
s = s - window.popleft() + nxt
window.append(nxt)
작동하는 기능은 다음과 같습니다.
>>> values = range(100)
>>> N = 5
>>> window_avg = sliding_avg(values, N)
>>>
>>> next(window_avg) # (0 + 1 + 2 + 3 + 4)/5
>>> 2.0
>>> next(window_avg) # (1 + 2 + 3 + 4 + 5)/5
>>> 3.0
>>> next(window_avg) # (2 + 3 + 4 + 5 + 6)/5
>>> 4.0
연속 평균 계산에 대한 위의 많은 답변이 있습니다. 내 대답은 두 가지 추가 기능을 추가합니다.
- nan 값을 무시합니다
- 관심 값 자체를 포함하지 않는 N 개의 주변 값에 대한 평균을 계산합니다.
이 두 번째 특징은 어떤 값이 일반적인 추세와 어떤 값으로 다른지 결정하는 데 특히 유용합니다.
numpy.cumsum이 가장 시간 효율적인 방법이므로 사용합니다 ( 위의 Alleo 's answer 참조 ).
N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x)
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)
이 코드는 N까지만 작동합니다. padded_x 및 n_nan의 np.insert를 변경하여 홀수를 조정할 수 있습니다.
이 코드는 컷오프 = 3이 아닌 값보다 작은 값으로 계산 된 모든 이동 평균 값을 제거하도록 쉽게 조정할 수 있습니다.
window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)
이 방법이있는 위 의 답변 중 하나에 묻힌 mab 의 의견 이 있습니다. 다음은 단순한 이동 평균입니다.bottleneck
move_mean
import numpy as np
import bottleneck as bn
a = np.arange(10) + np.random.random(10)
mva = bn.move_mean(a, window=2, min_count=1)
min_count
기본적으로 배열의 해당 지점까지 이동 평균을 취하는 편리한 매개 변수입니다. 설정하지 않으면 min_count
같고 점 window
까지 모든 것이됩니다 .window
nan
이 질문에 대한 해결책이 있지만 내 해결책을 살펴보십시오. 매우 간단하고 잘 작동합니다.
import numpy as np
dataset = np.asarray([1, 2, 3, 4, 5, 6, 7])
ma = list()
window = 3
for t in range(0, len(dataset)):
if t+window <= len(dataset):
indices = range(t, t+window)
ma.append(np.average(np.take(dataset, indices)))
else:
ma = np.asarray(ma)
다른 답변을 읽었을 때 이것이 이것이 요청한 것이라고 생각하지는 않지만 크기가 커지고있는 값 목록의 평균을 유지할 필요가 있습니다.
따라서 사이트, 측정 장치 등에서 수집 한 값 목록과 마지막으로 n
업데이트 된 평균 값을 유지하려는 경우 새 코드 추가 노력을 최소화하는 다음 코드를 사용할 수 있습니다. 집단:
class Running_Average(object):
def __init__(self, buffer_size=10):
"""
Create a new Running_Average object.
This object allows the efficient calculation of the average of the last
`buffer_size` numbers added to it.
Examples
--------
>>> a = Running_Average(2)
>>> a.add(1)
>>> a.get()
1.0
>>> a.add(1) # there are two 1 in buffer
>>> a.get()
1.0
>>> a.add(2) # there's a 1 and a 2 in the buffer
>>> a.get()
1.5
>>> a.add(2)
>>> a.get() # now there's only two 2 in the buffer
2.0
"""
self._buffer_size = int(buffer_size) # make sure it's an int
self.reset()
def add(self, new):
"""
Add a new number to the buffer, or replaces the oldest one there.
"""
new = float(new) # make sure it's a float
n = len(self._buffer)
if n < self.buffer_size: # still have to had numbers to the buffer.
self._buffer.append(new)
if self._average != self._average: # ~ if isNaN().
self._average = new # no previous numbers, so it's new.
else:
self._average *= n # so it's only the sum of numbers.
self._average += new # add new number.
self._average /= (n+1) # divide by new number of numbers.
else: # buffer full, replace oldest value.
old = self._buffer[self._index] # the previous oldest number.
self._buffer[self._index] = new # replace with new one.
self._index += 1 # update the index and make sure it's...
self._index %= self.buffer_size # ... smaller than buffer_size.
self._average -= old/self.buffer_size # remove old one...
self._average += new/self.buffer_size # ...and add new one...
# ... weighted by the number of elements.
def __call__(self):
"""
Return the moving average value, for the lazy ones who don't want
to write .get .
"""
return self._average
def get(self):
"""
Return the moving average value.
"""
return self()
def reset(self):
"""
Reset the moving average.
If for some reason you don't want to just create a new one.
"""
self._buffer = [] # could use np.empty(self.buffer_size)...
self._index = 0 # and use this to keep track of how many numbers.
self._average = float('nan') # could use np.NaN .
def get_buffer_size(self):
"""
Return current buffer_size.
"""
return self._buffer_size
def set_buffer_size(self, buffer_size):
"""
>>> a = Running_Average(10)
>>> for i in range(15):
... a.add(i)
...
>>> a()
9.5
>>> a._buffer # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Decreasing buffer size:
>>> a.buffer_size = 6
>>> a._buffer # should not access this!!
[9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
>>> a.buffer_size = 2
>>> a._buffer
[13.0, 14.0]
Increasing buffer size:
>>> a.buffer_size = 5
Warning: no older data available!
>>> a._buffer
[13.0, 14.0]
Keeping buffer size:
>>> a = Running_Average(10)
>>> for i in range(15):
... a.add(i)
...
>>> a()
9.5
>>> a._buffer # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>>> a.buffer_size = 10 # reorders buffer!
>>> a._buffer
[5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
"""
buffer_size = int(buffer_size)
# order the buffer so index is zero again:
new_buffer = self._buffer[self._index:]
new_buffer.extend(self._buffer[:self._index])
self._index = 0
if self._buffer_size < buffer_size:
print('Warning: no older data available!') # should use Warnings!
else:
diff = self._buffer_size - buffer_size
print(diff)
new_buffer = new_buffer[diff:]
self._buffer_size = buffer_size
self._buffer = new_buffer
buffer_size = property(get_buffer_size, set_buffer_size)
예를 들어 다음과 같이 테스트 할 수 있습니다.
def graph_test(N=200):
import matplotlib.pyplot as plt
values = list(range(N))
values_average_calculator = Running_Average(N/2)
values_averages = []
for value in values:
values_average_calculator.add(value)
values_averages.append(values_average_calculator())
fig, ax = plt.subplots(1, 1)
ax.plot(values, label='values')
ax.plot(values_averages, label='averages')
ax.grid()
ax.set_xlim(0, N)
ax.set_ylim(0, N)
fig.show()
다음을 제공합니다.
Python 표준 라이브러리 만 사용 (메모리 효율적)
표준 라이브러리 deque
만 사용하는 다른 버전을 제공하십시오 . 이 답변의 대부분이 사용하고 있는지 나에게 꽤 놀라운 pandas
나 numpy
.
def moving_average(iterable, n=3):
d = deque(maxlen=n)
for i in iterable:
d.append(i)
if len(d) == n:
yield sum(d)/n
r = moving_average([40, 30, 50, 46, 39, 44])
assert list(r) == [40.0, 42.0, 45.0, 43.0]
def moving_average(iterable, n=3):
# moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
# http://en.wikipedia.org/wiki/Moving_average
it = iter(iterable)
d = deque(itertools.islice(it, n-1))
d.appendleft(0)
s = sum(d)
for elem in it:
s += elem - d.popleft()
d.append(elem)
yield s / n
그러나 구현은 나에게 좀 더 복잡해 보입니다. 그러나 이유는 표준 파이썬 문서에 있어야합니다. 누군가가 내 표준 및 표준 문서의 구현에 대해 언급 할 수 있습니까?
방법에 대한 이동 평균 필터 ? 또한 하나의 라이너이며 직사각형 이외의 것이 필요한 경우 창 유형을 쉽게 조작 할 수 있다는 장점이 있습니다. 배열의 N- 길이 간단한 이동 평균 a :
lfilter(np.ones(N)/N, [1], a)[N:]
그리고 삼각형 창이 적용된 상태에서 :
lfilter(np.ones(N)*scipy.signal.triang(N)/N, [1], a)[N:]
표준 라이브러리를 사용하고 deque를 사용하는 또 다른 솔루션 :
from collections import deque
import itertools
def moving_average(iterable, n=3):
# http://en.wikipedia.org/wiki/Moving_average
it = iter(iterable)
# create an iterable object from input argument
d = deque(itertools.islice(it, n-1))
# create deque object by slicing iterable
d.appendleft(0)
s = sum(d)
for elem in it:
s += elem - d.popleft()
d.append(elem)
yield s / n
# example on how to use it
for i in moving_average([40, 30, 50, 46, 39, 44]):
print(i)
# 40.0
# 42.0
# 45.0
# 43.0
교육 목적을 위해 Numpy 솔루션을 두 개 더 추가합니다 (cumsum 솔루션보다 느림).
import numpy as np
from numpy.lib.stride_tricks import as_strided
def ra_strides(arr, window):
''' Running average using as_strided'''
n = arr.shape[0] - window + 1
arr_strided = as_strided(arr, shape=[n, window], strides=2*arr.strides)
return arr_strided.mean(axis=1)
def ra_add(arr, window):
''' Running average using add.reduceat'''
n = arr.shape[0] - window + 1
indices = np.array([0, window]*n) + np.repeat(np.arange(n), 2)
arr = np.append(arr, 0)
return np.add.reduceat(arr, indices )[::2]/window
사용 된 함수 : as_strided , add.reduceat
기존 라이브러리를 사용하지 않고 자신의 롤을 선택하는 경우 부동 소수점 오류를 염두에두고 그 효과를 최소화하십시오.
class SumAccumulator:
def __init__(self):
self.values = [0]
self.count = 0
def add( self, val ):
self.values.append( val )
self.count = self.count + 1
i = self.count
while i & 0x01:
i = i >> 1
v0 = self.values.pop()
v1 = self.values.pop()
self.values.append( v0 + v1 )
def get_total(self):
return sum( reversed(self.values) )
def get_size( self ):
return self.count
모든 값이 대략 같은 크기의 순서 인 경우, 거의 비슷한 크기의 값을 항상 추가하여 정밀도를 유지하는 데 도움이됩니다.
참고 URL : https://stackoverflow.com/questions/13728392/moving-average-or-running-mean
'IT' 카테고리의 다른 글
자원의 DisplayName 속성? (0) | 2020.06.03 |
---|---|
액션 당 레일 레이아웃? (0) | 2020.06.03 |
div 요소 내부의 텍스트를 바꾸려면 어떻게합니까? (0) | 2020.06.03 |
확인란의 값을 가져 옵니까? (0) | 2020.06.03 |
자바 8 람다 무효 인수 (0) | 2020.06.03 |