時間序列特征提?。簭睦碚摰絇ython代碼實踐
時間序列是一種特殊的存在。這意味著你對表格數(shù)據(jù)或圖像進(jìn)行的許多轉(zhuǎn)換/操作/處理技術(shù)對于時間序列來說可能根本不起作用。
"特征提取"的想法是對我們擁有的數(shù)據(jù)進(jìn)行"加工",確保我們提取所有有意義的特征,以便下一步(通常是機(jī)器學(xué)習(xí)應(yīng)用)可以從中受益。也就是說它是一種通過提供重要特征并過濾掉所有不太重要的特征來"幫助"機(jī)器學(xué)習(xí)步驟的方法。
這是完整的特征提取過程:
對于表格數(shù)據(jù)和信號,他們的特征根本就不同,比如說峰和谷的概念,傅里葉變換或小波變換的想法,以及獨立分量分析(ICA)的概念只有在處理信號時才真正有意義。
目前有兩大類進(jìn)行特征提取的方法:
- 基于數(shù)據(jù)驅(qū)動的方法: 這些方法旨在僅通過觀察信號來提取特征。它們忽略機(jī)器學(xué)習(xí)步驟及其目標(biāo)(例如分類、預(yù)測或回歸),只看信號,對其進(jìn)行處理,并從中提取信息。
- 基于模型的方法: 這些方法著眼于整個流程,旨在為特定問題找到解決方案的特征。
數(shù)據(jù)驅(qū)動方法的優(yōu)點是它們通常在計算上簡單易用,不需要相應(yīng)的目標(biāo)輸出。它們的缺點是特征不是針對你的特定問題的。例如,對信號進(jìn)行傅里葉變換并將其用作特征可能不如在端到端模型中訓(xùn)練的特定特征那樣優(yōu)化。
在這篇文章中,為了簡單起見,我們將只討論數(shù)據(jù)驅(qū)動方法。并且討論將基于領(lǐng)域特定的方法、基于頻率的方法、基于時間的方法和基于統(tǒng)計的方法。
1、領(lǐng)域特定特征提取
提取特征的最佳方法是考慮特定的問題。例如,假設(shè)正在處理一個工程實驗的信號,我們真正關(guān)心的是t = 6s后的振幅。這些是特征提取在一般情況下并不真正有意義,但對特定情況實際上非常相關(guān)。這就是我們所說的領(lǐng)域特定特征提取。這里沒有太多的數(shù)學(xué)和編碼,但這就是它應(yīng)該是的樣子,因為它極度依賴于你的特定情況。
2、基于頻率的特征提取
這種方法與我們的時間序列/信號的譜分析有關(guān)。我們有一個自然域,自然域是查看信號的最簡單方式,它是時間域,意味著我們將信號視為給定時間的值(或向量)。
例如考慮這個信號,在其自然域中:
圖片
繪制它就可以得到:
這是自然(時間)域,也是我們數(shù)據(jù)集的最簡單域。
然后我們可以將其轉(zhuǎn)換為頻率域。 信號有三個周期性組件。頻率域的想法是將信號分解為其周期性組件的頻率、振幅和相位。
信號y(t)的傅里葉變換Y(k)如下:
這描述了頻率為k的分量的振幅和相位。在提取有意義特征方面,可以提取10個主要分量(振幅最高的)的振幅、相位和頻率值。這將是10x3個特征(振幅、頻率和相位 x 10),它們將根據(jù)譜信息描述時間序列。
這種方法可以擴(kuò)展。例如,可以將我們的信號分解,不是基于正弦/余弦函數(shù),而是基于小波,這是另一種形式的周期波。這種分解被稱為小波分解。
我們使用代碼來詳細(xì)解釋,首先從非常簡單的傅里葉變換開始。
首先,我們需要邀請一些朋友來參加派對:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
現(xiàn)在讓我們以這個信號為例:
plt.figure(figsize=(10,5))
x = np.linspace(-2*np.pi,2*np.pi,1000)
y = np.sin(x) + 0.4*np.cos(2*x) + 2*np.sin(3.2*x)
plt.plot(x,y,color='firebrick',lw=2)
plt.xlabel('Time (t)',fontsize=24)
plt.xticks(fontsize=14)
plt.yticks(fontsize=14)
plt.grid(alpha=0.4)
plt.ylabel('y',fontsize=24)
這個信號有三個主要組成部分。一個振幅 = 1和頻率 = 1,一個振幅 = 0.4和頻率 = 2,一個振幅 = 2和頻率 = 3.2。我們可以通過運行傅里葉變換來恢復(fù)它們:
import numpy as np
import matplotlib.pyplot as plt
# Generate the time-domain signal
x = np.linspace(-8*np.pi, 8*np.pi, 10000)
y = np.sin(x) + 0.4*np.cos(2*x) + 2*np.sin(3.2*x)
y= y -np.mean(y)
# Perform the Fourier Transform
Y = np.fft.fft(y)
# Calculate the frequency bins
frequencies = np.fft.fftfreq(len(x), d=(x[1] - x[0]) / (2*np.pi))
# Normalize the amplitude of the FFT
Y_abs = 2*np.abs(Y) / len(x)
# Zero out very small values to remove noise
Y_abs[Y_abs < 1e-6] = 0
relevant_frequencies = np.where((frequencies>0) & (frequencies<10))
Y_phase = np.angle(Y)[relevant_frequencies]
frequencies = frequencies[relevant_frequencies]
Y_abs = Y_abs[relevant_frequencies]
# Plot the magnitude of the Fourier Transform
plt.figure(figsize=(10, 6))
plt.plot(frequencies, Y_abs)
plt.xlim(0, 10) # Limit x-axis to focus on relevant frequencies
plt.xticks([3.2,1,2])
plt.title('Fourier Transform of the Signal')
plt.xlabel('Frequency (Hz)')
plt.ylabel('Magnitude')
plt.grid(True)
plt.show()
我們可以清楚地看到三個峰值,對應(yīng)的振幅和頻率。
可以用一個非常簡單的函數(shù)來完成所有工作
def extract_fft_features( y, x=None, num_features = 5,max_frequency = 10):
y= y -np.mean(y)
# Perform the Fourier Transform
Y = np.fft.fft(y)
# Calculate the frequency bins
if x is None:
x = np.linspace(0,len(y))
frequencies = np.fft.fftfreq(len(x), d=(x[1] - x[0]) / (2*np.pi))
Y_abs = 2*np.abs(Y) / len(x)
Y_abs[Y_abs < 1e-6] = 0
relevant_frequencies = np.where((frequencies>0) & (frequencies<max_frequency))
Y_phase = np.angle(Y)[relevant_frequencies]
frequencies = frequencies[relevant_frequencies]
Y_abs = Y_abs[relevant_frequencies]
largest_amplitudes = np.flip(np.argsort(Y_abs))[0:num_features]
top_5_amplitude = Y_abs[largest_amplitudes]
top_5_frequencies = frequencies[largest_amplitudes]
top_5_phases = Y_phase[largest_amplitudes]
fft_features = top_5_amplitude.tolist()+top_5_frequencies.tolist()+top_5_phases.tolist()
amp_keys = ['Amplitude '+str(i) for i in range(1,num_features+1)]
freq_keys = ['Frequency '+str(i) for i in range(1,num_features+1)]
phase_keys = ['Phase '+str(i) for i in range(1,num_features+1)]
fft_keys = amp_keys+freq_keys+phase_keys
fft_dict = {fft_keys[i]:fft_features[i] for i in range(len(fft_keys))}
fft_data = pd.DataFrame(fft_features).T
fft_data.columns = fft_keys
return fft_dict, fft_data
所以如果輸入信號y和(可選):
- x或時間數(shù)組,考慮的特征數(shù)量(或峰值),最大頻率
輸出:
x = np.linspace(-8*np.pi, 8*np.pi, 10000)
y = np.sin(x) + 0.4*np.cos(2*x) + 2*np.sin(3.2*x)
extract_fft_features(x=x,y=y,num_features=3)[1]
如果想使用小波(不是正弦/余弦)*來提取特征,可以進(jìn)行小波變換。需要安裝這個庫:
pip install PyWavelets
然后運行這個:
import numpy as np
import pywt
import pandas as pd
def extract_wavelet_features(y, wavelet='db4', level=3, num_features=5):
y = y - np.mean(y) # Remove the mean
# Perform the Discrete Wavelet Transform
coeffs = pywt.wavedec(y, wavelet, level=level)
# Flatten the list of coefficients into a single array
coeffs_flat = np.hstack(coeffs)
# Get the absolute values of the coefficients
coeffs_abs = np.abs(coeffs_flat)
# Find the indices of the largest coefficients
largest_coeff_indices = np.flip(np.argsort(coeffs_abs))[0:num_features]
# Extract the largest coefficients as features
top_coeffs = coeffs_flat[largest_coeff_indices]
# Generate feature names for the wavelet features
feature_keys = ['Wavelet Coeff ' + str(i+1) for i in range(num_features)]
# Create a dictionary for the features
wavelet_dict = {feature_keys[i]: top_coeffs[i] for i in range(num_features)}
# Create a DataFrame for the features
wavelet_data = pd.DataFrame(top_coeffs).T
wavelet_data.columns = feature_keys
return wavelet_dict, wavelet_data
# Example usage:
wavelet_dict, wavelet_data = extract_wavelet_features(y)
wavelet_data
3、基于統(tǒng)計的特征提取
特征提取的另一種方法是依靠統(tǒng)計學(xué)。 給定一個信號,有多種方法可以從中提取一些統(tǒng)計信息。從簡單到復(fù)雜,這是我們可以提取的信息列表:
- 平均值,只是信號的總和除以信號的時間步數(shù)。
- 方差,即信號與平均值的偏離程度:
- 偏度和峰度。這些是測試你的時間序列分布"不是高斯分布"程度的指標(biāo)。偏度描述它有多不對稱,峰度描述它有多"拖尾"。
- 分位數(shù):這些是將時間序列分成具有概率范圍的區(qū)間的值。例如,0.25分位數(shù)值為10意味著你的時間序列中25%的值低于10,剩余75%大于10
- 自相關(guān)。這基本上告訴你時間序列有多"模式化",意味著時間序列中模式的強(qiáng)度?;蛘哌@個指標(biāo)表示時間序列值與其自身過去值的相關(guān)程度。
- 熵:表示時間序列的復(fù)雜性或不可預(yù)測性。
這些屬性中的每一個都可以用一行代碼實現(xiàn):
import numpy as np
import pandas as pd
from scipy.stats import skew, kurtosis
def extract_statistical_features(y):
def calculate_entropy(y):
# Ensure y is positive and normalized
y = np.abs(y)
y_sum = np.sum(y)
# Avoid division by zero
if y_sum == 0:
return 0
# Normalize the signal
p = y / y_sum
# Calculate entropy
entropy_value = -np.sum(p * np.log2(p + 1e-12)) # Add a small value to avoid log(0)
return entropy_value
# Remove the mean to center the data
y_centered = y - np.mean(y)
y = y+np.max(np.abs(y))*10**-4
# Statistical features
mean_value = np.mean(y)
variance_value = np.var(y)
skewness_value = skew(y)
kurtosis_value = kurtosis(y)
autocorrelation_value = np.correlate(y_centered, y_centered, mode='full')[len(y) - 1] / len(y)
quantiles = np.percentile(y, [25, 50, 75])
entropy_value = calculate_entropy(y) # Add a small value to avoid log(0)
# Create a dictionary of features
statistical_dict = {
'Mean': mean_value,
'Variance': variance_value,
'Skewness': skewness_value,
'Kurtosis': kurtosis_value,
'Autocorrelation': autocorrelation_value,
'Quantile_25': quantiles[0],
'Quantile_50': quantiles[1],
'Quantile_75': quantiles[2],
'Entropy': entropy_value
}
# Convert to DataFrame for easy visualization and manipulation
statistical_data = pd.DataFrame([statistical_dict])
return statistical_dict, statistical_data
y = np.sin(x) + 0.4*np.cos(2*x) + 2*np.sin(3.2*x)
wavelet_dict, wavelet_data = extract_statistical_features(y)
wavelet_data
4、基于時間的特征提取器
這部分將專注于如何通過僅提取時間特征來提取時間序列的信息。比如提取峰值和谷值的信息。我們將使用來自SciPy的find_peaks函數(shù)。
import numpy as np
from scipy.signal import find_peaks
def extract_peaks_and_valleys(y, N=10):
# Find peaks and valleys
peaks, _ = find_peaks(y)
valleys, _ = find_peaks(-y)
# Combine peaks and valleys
all_extrema = np.concatenate((peaks, valleys))
all_values = np.concatenate((y[peaks], -y[valleys]))
# Sort by absolute amplitude (largest first)
sorted_indices = np.argsort(-np.abs(all_values))
sorted_extrema = all_extrema[sorted_indices]
sorted_values = all_values[sorted_indices]
# Select the top N extrema
top_extrema = sorted_extrema[:N]
top_values = sorted_values[:N]
# Pad with zeros if fewer than N extrema are found
if len(top_extrema) < N:
padding = 10 - len(top_extrema)
top_extrema = np.pad(top_extrema, (0, padding), 'constant', constant_values=0)
top_values = np.pad(top_values, (0, padding), 'constant', constant_values=0)
# Prepare the features
features = []
for i in range(N):
features.append(top_values[i])
features.append(top_extrema[i])
# Create a dictionary of features
feature_dict = {f'peak_{i+1}': features[2*i] for i in range(N)}
feature_dict.update({f'loc_{i+1}': features[2*i+1] for i in range(N)})
return feature_dict,pd.DataFrame([feature_dict])
# Example usage:
x = np.linspace(-2*np.pi,2*np.pi,1000)
y = np.sin(x) + 0.4*np.cos(2*x) + 2*np.sin(3.2*x)
features = extract_peaks_and_valleys(y, N=10)
features[1]
我們選擇N = 10個峰值,但信號實際上只有M = 4個峰值,剩余的6個位置和峰值振幅將為0。
5、使用哪種方法?
我們已經(jīng)看到了4種不同類別的方法。
我們應(yīng)該使用哪一種?
如果你有一個基于領(lǐng)域的特征提取,那總是最好的選擇:如果實驗的物理或問題的先驗知識是清晰的,你應(yīng)該依賴于它并將這些特征視為最重要的,甚至可能將它們視為唯一的特征。
就頻率、統(tǒng)計和基于時間的特征而言,在可以將它們一起使用。將這些特征添加到你的數(shù)據(jù)集中,然后看看其中的一些是否有幫助,沒有幫助就把他們刪除掉,這是一個測試的過程,就像超參數(shù)搜索一樣,我們無法確定好壞,所以只能靠最后的結(jié)果來判斷。