使用Transformer 模型進(jìn)行時間序列預(yù)測的Pytorch代碼示例
時間序列預(yù)測是一個經(jīng)久不衰的主題,受自然語言處理領(lǐng)域的成功啟發(fā),transformer模型也在時間序列預(yù)測有了很大的發(fā)展。本文可以作為學(xué)習(xí)使用Transformer 模型的時間序列預(yù)測的一個起點(diǎn)。
數(shù)據(jù)集 這里我們直接使用kaggle中的 Store Sales — Time Series Forecasting作為數(shù)據(jù)。這個比賽需要預(yù)測54家商店中各種產(chǎn)品系列未來16天的銷售情況,總共創(chuàng)建1782個時間序列。數(shù)據(jù)從2013年1月1日至2017年8月15日,目標(biāo)是預(yù)測接下來16天的銷售情況。雖然為了簡潔起見,我們做了簡化處理,作為模型的輸入包含20列中的3,029,400條數(shù)據(jù),。每行的關(guān)鍵列為' store_nbr '、' family '和' date '。數(shù)據(jù)分為三類變量:
1、截止到最后一次訓(xùn)練數(shù)據(jù)日期(2017年8月15日)之前已知的與時間相關(guān)的變量。這些變量包括數(shù)字變量,如“銷售額”,表示某一產(chǎn)品系列在某家商店的銷售額;“transactions”,一家商店的交易總數(shù);' store_sales ',該商店的總銷售額;' family_sales '表示該產(chǎn)品系列的總銷售額。
2、訓(xùn)練截止日期(2017年8月31日)之前已知,包括“onpromotion”(產(chǎn)品系列中促銷產(chǎn)品的數(shù)量)和“dcoilwtico”等變量。這些數(shù)字列由' holiday '列補(bǔ)充,它表示假日或事件的存在,并被分類編碼為整數(shù)。此外,' time_idx '、' week_day '、' month_day '、' month '和' year '列提供時間上下文,也編碼為整數(shù)。雖然我們的模型是只有編碼器的,但已經(jīng)添加了16天移動值“onpromotion”和“dcoilwtico”,以便在沒有解碼器的情況下包含未來的信息。
3、靜態(tài)協(xié)變量隨著時間的推移保持不變,包括諸如“store_nbr”、“family”等標(biāo)識符,以及“city”、“state”、“type”和“cluster”等分類變量(詳細(xì)說明了商店的特征),所有這些變量都是整數(shù)編碼的。
我們最后生成的df名為' data_all ',結(jié)構(gòu)如下:
categorical_covariates = ['time_idx','week_day','month_day','month','year','holiday']
categorical_covariates_num_embeddings = []
for col in categorical_covariates:
data_all[col] = data_all[col].astype('category').cat.codes
categorical_covariates_num_embeddings.append(data_all[col].nunique())
categorical_static = ['store_nbr','city','state','type','cluster','family_int']
categorical_static_num_embeddings = []
for col in categorical_static:
data_all[col] = data_all[col].astype('category').cat.codes
categorical_static_num_embeddings.append(data_all[col].nunique())
numeric_covariates = ['sales','dcoilwtico','dcoilwtico_future','onpromotion','onpromotion_future','store_sales','transactions','family_sales']
target_idx = np.where(np.array(numeric_covariates)=='sales')[0][0]
在將數(shù)據(jù)轉(zhuǎn)換為適合我的PyTorch模型的張量之前,需要將其分為訓(xùn)練集和驗(yàn)證集。窗口大小是一個重要的超參數(shù),表示每個訓(xùn)練樣本的序列長度。此外,' num_val '表示使用的驗(yàn)證折數(shù),在此上下文中設(shè)置為2。將2013年1月1日至2017年6月28日的觀測數(shù)據(jù)指定為訓(xùn)練數(shù)據(jù)集,以2017年6月29日至2017年7月14日和2017年7月15日至2017年7月30日作為驗(yàn)證區(qū)間。
同時還進(jìn)行了數(shù)據(jù)的縮放,完整代碼如下:
def dataframe_to_tensor(series,numeric_covariates,categorical_covariates,categorical_static,target_idx):
numeric_cov_arr = np.array(series[numeric_covariates].values.tolist())
category_cov_arr = np.array(series[categorical_covariates].values.tolist())
static_cov_arr = np.array(series[categorical_static].values.tolist())
x_numeric = torch.tensor(numeric_cov_arr,dtype=torch.float32).transpose(2,1)
x_numeric = torch.log(x_numeric+1e-5)
x_category = torch.tensor(category_cov_arr,dtype=torch.long).transpose(2,1)
x_static = torch.tensor(static_cov_arr,dtype=torch.long)
y = torch.tensor(numeric_cov_arr[:,target_idx,:],dtype=torch.float32)
return x_numeric, x_category, x_static, y
window_size = 16
forecast_length = 16
num_val = 2
val_max_date = '2017-08-15'
train_max_date = str((pd.to_datetime(val_max_date) - pd.Timedelta(days=window_size*num_val+forecast_length)).date())
train_final = data_all[data_all['date']<=train_max_date]
val_final = data_all[(data_all['date']>train_max_date)&(data_all['date']<=val_max_date)]
train_series = train_final.groupby(categorical_static+['family']).agg(list).reset_index()
val_series = val_final.groupby(categorical_static+['family']).agg(list).reset_index()
x_numeric_train_tensor, x_category_train_tensor, x_static_train_tensor, y_train_tensor = dataframe_to_tensor(train_series,numeric_covariates,categorical_covariates,categorical_static,target_idx)
x_numeric_val_tensor, x_category_val_tensor, x_static_val_tensor, y_val_tensor = dataframe_to_tensor(val_series,numeric_covariates,categorical_covariates,categorical_static,target_idx)
數(shù)據(jù)加載器 在數(shù)據(jù)加載時,需要將每個時間序列從窗口范圍內(nèi)的隨機(jī)索引開始劃分為時間塊,以確保模型暴露于不同的序列段。
為了減少偏差還引入了一個額外的超參數(shù)設(shè)置,它不是隨機(jī)打亂數(shù)據(jù),而是根據(jù)塊的開始時間對數(shù)據(jù)集進(jìn)行排序。然后數(shù)據(jù)被分成五部分——反映了我們五年的數(shù)據(jù)集——每一部分都是內(nèi)部打亂的,這樣最后一批數(shù)據(jù)將包括去年的觀察結(jié)果,但還是隨機(jī)的。模型的最終梯度更新受到最近一年的影響,理論上可以改善最近時期的預(yù)測。
def divide_shuffle(df,div_num):
space = df.shape[0]//div_num
division = np.arange(0,df.shape[0],space)
return pd.concat([df.iloc[division[i]:division[i]+space,:].sample(frac=1) for i in range(len(division))])
def create_time_blocks(time_length,window_size):
start_idx = np.random.randint(0,window_size-1)
end_idx = time_length-window_size-16-1
time_indices = np.arange(start_idx,end_idx+1,window_size)[:-1]
time_indices = np.append(time_indices,end_idx)
return time_indices
def data_loader(x_numeric_tensor, x_category_tensor, x_static_tensor, y_tensor, batch_size, time_shuffle):
num_series = x_numeric_tensor.shape[0]
time_length = x_numeric_tensor.shape[1]
index_pd = pd.DataFrame({'serie_idx':range(num_series)})
index_pd['time_idx'] = [create_time_blocks(time_length,window_size) for n in range(index_pd.shape[0])]
if time_shuffle:
index_pd = index_pd.explode('time_idx')
index_pd = index_pd.sample(frac=1)
else:
index_pd = index_pd.explode('time_idx').sort_values('time_idx')
index_pd = divide_shuffle(index_pd,5)
indices = np.array(index_pd).astype(int)
for batch_idx in np.arange(0,indices.shape[0],batch_size):
cur_indices = indices[batch_idx:batch_idx+batch_size,:]
x_numeric = torch.stack([x_numeric_tensor[n[0],n[1]:n[1]+window_size,:] for n in cur_indices])
x_category = torch.stack([x_category_tensor[n[0],n[1]:n[1]+window_size,:] for n in cur_indices])
x_static = torch.stack([x_static_tensor[n[0],:] for n in cur_indices])
y = torch.stack([y_tensor[n[0],n[1]+window_size:n[1]+window_size+forecast_length] for n in cur_indices])
yield x_numeric.to(device), x_category.to(device), x_static.to(device), y.to(device)
def val_loader(x_numeric_tensor, x_category_tensor, x_static_tensor, y_tensor, batch_size, num_val):
num_time_series = x_numeric_tensor.shape[0]
for i in range(num_val):
for batch_idx in np.arange(0,num_time_series,batch_size):
x_numeric = x_numeric_tensor[batch_idx:batch_idx+batch_size,window_size*i:window_size*(i+1),:]
x_category = x_category_tensor[batch_idx:batch_idx+batch_size,window_size*i:window_size*(i+1),:]
x_static = x_static_tensor[batch_idx:batch_idx+batch_size]
y_val = y_tensor[batch_idx:batch_idx+batch_size,window_size*(i+1):window_size*(i+1)+forecast_length]
yield x_numeric.to(device), x_category.to(device), x_static.to(device), y_val.to(device)
模型
我們這里通過Pytorch來簡單的實(shí)現(xiàn)《Attention is All You Need》(2017)2中描述的Transformer架構(gòu)。因?yàn)槭菚r間序列預(yù)測,所以注意力機(jī)制中不需要因果關(guān)系,也就是沒有對注意塊應(yīng)用進(jìn)行遮蔽。
從輸入開始:分類特征通過嵌入層傳遞,以密集的形式表示它們,然后送到Transformer塊。多層感知器(MLP)接受最終編碼輸入來產(chǎn)生預(yù)測。嵌入維數(shù)、每個Transformer塊中的注意頭數(shù)和dropout概率是模型的主要超參數(shù)。堆疊多個Transformer塊由' num_blocks '超參數(shù)控制。
下面是單個Transformer塊的實(shí)現(xiàn)和整體預(yù)測模型:
class transformer_block(nn.Module):
def __init__(self,embed_size,num_heads):
super(transformer_block, self).__init__()
self.attention = nn.MultiheadAttention(embed_size, num_heads, batch_first=True)
self.fc = nn.Sequential(nn.Linear(embed_size, 4 * embed_size),
nn.LeakyReLU(),
nn.Linear(4 * embed_size, embed_size))
self.dropout = nn.Dropout(drop_prob)
self.ln1 = nn.LayerNorm(embed_size, eps=1e-6)
self.ln2 = nn.LayerNorm(embed_size, eps=1e-6)
def forward(self, x):
attn_out, _ = self.attention(x, x, x, need_weights=False)
x = x + self.dropout(attn_out)
x = self.ln1(x)
fc_out = self.fc(x)
x = x + self.dropout(fc_out)
x = self.ln2(x)
return x
class transformer_forecaster(nn.Module):
def __init__(self,embed_size,num_heads,num_blocks):
super(transformer_forecaster, self).__init__()
num_len = len(numeric_covariates)
self.embedding_cov = nn.ModuleList([nn.Embedding(n,embed_size-num_len) for n in categorical_covariates_num_embeddings])
self.embedding_static = nn.ModuleList([nn.Embedding(n,embed_size-num_len) for n in categorical_static_num_embeddings])
self.blocks = nn.ModuleList([transformer_block(embed_size,num_heads) for n in range(num_blocks)])
self.forecast_head = nn.Sequential(nn.Linear(embed_size, embed_size*2),
nn.LeakyReLU(),
nn.Dropout(drop_prob),
nn.Linear(embed_size*2, embed_size*4),
nn.LeakyReLU(),
nn.Linear(embed_size*4, forecast_length),
nn.ReLU())
def forward(self, x_numeric, x_category, x_static):
tmp_list = []
for i,embed_layer in enumerate(self.embedding_static):
tmp_list.append(embed_layer(x_static[:,i]))
categroical_static_embeddings = torch.stack(tmp_list).mean(dim=0).unsqueeze(1)
tmp_list = []
for i,embed_layer in enumerate(self.embedding_cov):
tmp_list.append(embed_layer(x_category[:,:,i]))
categroical_covariates_embeddings = torch.stack(tmp_list).mean(dim=0)
T = categroical_covariates_embeddings.shape[1]
embed_out = (categroical_covariates_embeddings + categroical_static_embeddings.repeat(1,T,1))/2
x = torch.concat((x_numeric,embed_out),dim=-1)
for block in self.blocks:
x = block(x)
x = x.mean(dim=1)
x = self.forecast_head(x)
return x
我們修改后的transformer架構(gòu)如下圖所示:
模型接受三個獨(dú)立的輸入張量:數(shù)值特征、分類特征和靜態(tài)特征。對分類和靜態(tài)特征嵌入進(jìn)行平均,并與數(shù)字特征組合形成具有形狀(batch_size, window_size, embedding_size)的張量,為Transformer塊做好準(zhǔn)備。這個復(fù)合張量還包含嵌入的時間變量,提供必要的位置信息。
Transformer塊提取順序信息,然后將結(jié)果張量沿著時間維度聚合,將其傳遞到MLP中以生成最終預(yù)測(batch_size, forecast_length)。這個比賽采用均方根對數(shù)誤差(RMSLE)作為評價指標(biāo),公式為:
鑒于預(yù)測經(jīng)過對數(shù)轉(zhuǎn)換,預(yù)測低于-1的負(fù)銷售額(這會導(dǎo)致未定義的錯誤)需要進(jìn)行處理,所以為了避免負(fù)的銷售預(yù)測和由此產(chǎn)生的NaN損失值,在MLP層以后增加了一層ReLU激活確保非負(fù)預(yù)測。
class RMSLELoss(nn.Module):
def __init__(self):
super().__init__()
self.mse = nn.MSELoss()
def forward(self, pred, actual):
return torch.sqrt(self.mse(torch.log(pred + 1), torch.log(actual + 1)))
訓(xùn)練和驗(yàn)證
訓(xùn)練模型時需要設(shè)置幾個超參數(shù):窗口大小、是否打亂時間、嵌入大小、頭部數(shù)量、塊數(shù)量、dropout、批大小和學(xué)習(xí)率。以下配置是有效的,但不保證是最好的:
num_epoch = 1000
min_val_loss = 999
num_blocks = 1
embed_size = 500
num_heads = 50
batch_size = 128
learning_rate = 3e-4
time_shuffle = False
drop_prob = 0.1
model = transformer_forecaster(embed_size,num_heads,num_blocks).to(device)
criterion = RMSLELoss()
optimizer = torch.optim.Adam(model.parameters(),lr=learning_rate)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)
這里使用adam優(yōu)化器和學(xué)習(xí)率調(diào)度,以便在訓(xùn)練期間逐步調(diào)整學(xué)習(xí)率。
for epoch in range(num_epoch):
batch_loader = data_loader(x_numeric_train_tensor, x_category_train_tensor, x_static_train_tensor, y_train_tensor, batch_size, time_shuffle)
train_loss = 0
counter = 0
model.train()
for x_numeric, x_category, x_static, y in batch_loader:
optimizer.zero_grad()
preds = model(x_numeric, x_category, x_static)
loss = criterion(preds, y)
train_loss += loss.item()
counter += 1
loss.backward()
optimizer.step()
train_loss = train_loss/counter
print(f'Epoch {epoch} training loss: {train_loss}')
model.eval()
val_batches = val_loader(x_numeric_val_tensor, x_category_val_tensor, x_static_val_tensor, y_val_tensor, batch_size, num_val)
val_loss = 0
counter = 0
for x_numeric_val, x_category_val, x_static_val, y_val in val_batches:
with torch.no_grad():
preds = model(x_numeric_val,x_category_val,x_static_val)
loss = criterion(preds,y_val).item()
val_loss += loss
counter += 1
val_loss = val_loss/counter
print(f'Epoch {epoch} validation loss: {val_loss}')
if val_loss<min_val_loss:
print('saved...')
torch.save(model,data_folder+'best.model')
min_val_loss = val_loss
scheduler.step()
結(jié)果
訓(xùn)練后,表現(xiàn)最好的模型的訓(xùn)練損失為0.387,驗(yàn)證損失為0.457。當(dāng)應(yīng)用于測試集時,該模型的RMSLE為0.416,比賽排名為第89位(前10%)。
更大的嵌入和更多的注意力頭似乎可以提高性能,但最好的結(jié)果是用一個單獨(dú)的Transformer 實(shí)現(xiàn)的,這表明在有限的數(shù)據(jù)下,簡單是優(yōu)點(diǎn)。當(dāng)放棄整體打亂而選擇局部打亂時,效果有所改善;引入輕微的時間偏差提高了預(yù)測的準(zhǔn)確性。