transformer序列预测示例
本文參考:
【python量化】將Transformer模型用于股票價格預測_螞蟻愛Python的博客-CSDN博客_transformer 股票
一、Transformer初步架構圖
二、transformer各組件信息
1、positional_encoder
X -> [positional_encoding(embedding + position)] -> X‘
2、multi_head_attention_forward
輸入:
Query:(L, N, E)
Key:(S, N, E)
Value:(S, N, E)
輸出:
Attn_output:(L,N,E)
Attn_output_weights:(N,L,S)
3、MultiheadAttention
輸入:
Embed_dim
Num_heads
Multihead_attn = MultiheadAttention(embed_dim, num_heads)
Attn_output, attn_output_weights = multihead_attn(query, key, value)
4、TransformerEncoderLayer
輸入:
d_model,
n_head
5、TransformerEncoder
輸入:
Encoder_layer
Num_layers
Encoder_layer = TransformerEncoderLayer(d_model=512, nhead=8)
Transformer_encoder = TransformerEncoder(encoder_layer, num_layers=6)
6、TransformerDecoderLayer
輸入:
d_model
n_head
7、TransformerDecoder
輸入:
Decoder_layer
Num_layers
Decoder_layer = TransformerDecoder(d_model=512, nhead=8)
Transformer_decoder = TransformerDecoder(decoder_layer, num_layers=6)
8、transformer
輸入:
Nhead
Num_encoder_layers
Transformer_model = Transformer(nhead=16, num_encoder_layers=12)
Src = torch.rand((10, 32, 512))
Tgt = torch.rand((20, 32, 512))
Out = transformer_model(src, tgt)
三、transformer進行序列預測示例
1、csv輸入序列的shape為(126, 1)
2、切分訓練集(88, )和測試集(38, )
3、滑動窗口input_window得到(序列值, 標簽值)
得到的訓練序列為(67, 2, 20)
67:窗口滑動次數;
2:序列+標簽=2個
20:輸入窗口長度
得到測試序列(17, 2, 20)
17:窗口滑動次數
2:序列+標簽=2個
20:輸入窗口長度
4、將input和target轉成模型需要的格式(S=20,N=2, E=1)
5、模型訓練至掩碼生成(20, 20)
6、Pos_encoder
Src(20, 2, 1)? + positional_encoder(20, 1, 250) => (20, 2, 250)
廣播機制完成
7、transformer_encoder:(20, 2, 250)-> (20, 2, 250)
8、decoder:(20, 2, 250) -> (20, 2, 1) 通過Linear完成
9、loss值計算
因為有mask存在,所以每次預測下個值時的后續值是未知的。計算loss的過程如下:
?四、完整代碼(針對原文中錯誤修改過的)
import torch import torch.nn as nn import numpy as np import time import math import matplotlib.pyplot as plt from sklearn.preprocessing import MinMaxScaler import pandas as pdtorch.manual_seed(0) np.random.seed(0)input_window = 20 output_window = 1 batch_size = 2 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(device)class PositionalEncoding(nn.Module):def __init__(self, d_model, max_len=5000):super(PositionalEncoding, self).__init__()pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0).transpose(0, 1)self.register_buffer('pe', pe)def forward(self, x):return x + self.pe[:x.size(0), :]class TransAm(nn.Module):def __init__(self, feature_size=250, num_layers=1, dropout=0.1):super(TransAm, self).__init__()self.model_type = 'Transformer'self.src_mask = Noneself.pos_encoder = PositionalEncoding(feature_size)self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=10, dropout=dropout)self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)self.decoder = nn.Linear(feature_size, 1)self.init_weights()def init_weights(self):initrange = 0.1self.decoder.bias.data.zero_()self.decoder.weight.data.uniform_(-initrange, initrange)def _generate_square_subsequent_mask(self, sz):mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0,1)mask = mask.float().masked_fill(mask==0, float('-inf')).masked_fill(mask == 1, float(0.0))return maskdef forward(self, src):if self.src_mask is None or self.src_mask.shape[0] == len(src):device = src.devicemask = self._generate_square_subsequent_mask(len(src)).to(device)self.src_mask = masksrc = self.pos_encoder(src)output = self.transformer_encoder(src, self.src_mask)output = self.decoder(output)return outputdef create_inout_sequences(input_data, tw):inout_seq = []L = len(input_data)for i in range(L - tw):train_seq = input_data[i: i + tw]train_label = input_data[i + output_window: i + tw + output_window]inout_seq.append((train_seq, train_label))return torch.FloatTensor(inout_seq)def get_data():series = pd.read_csv("D:\\temp\\0001_daily.csv", usecols=[0])scaler = MinMaxScaler(feature_range=[-1, 1])series = scaler.fit_transform(series.values.reshape(-1,1)).reshape(-1)train_samples = int(0.7 * len(series))train_data = series[:train_samples]test_data = series[train_samples:]train_sequence = create_inout_sequences(train_data, input_window)train_sequence = train_sequence[:-output_window]test_data = create_inout_sequences(test_data, input_window)test_data = test_data[:-output_window]return train_sequence.to(device), test_data.to(device)def get_batch(source, i, batch_size):seq_len = min(batch_size, len(source) - 1 - i)data = source[i : i + seq_len]input = torch.stack(torch.stack([item[0] for item in data]).chunk(input_window, 1))target = torch.stack(torch.stack([item[1] for item in data]).chunk(input_window, 1))return input, targetdef train(train_data):model.train()total_loss = 0for batch_index, i in enumerate(range(0, len(train_data) - 1, batch_size)):start_time = time.time()data, targets = get_batch(train_data, i, batch_size)optimizer.zero_grad()output = model(data)loss = criterion(output, targets)loss.backward()torch.nn.utils.clip_grad_norm(model.parameters(), 0.7)optimizer.step()total_loss += loss.item()log_interval = int(len(train_data) / batch_size / 5)if batch_index % log_interval == 0 and batch_index > 0:cur_loss = total_loss / log_intervalelapsed = time.time() - start_timeprint('| epoch {:3d} | {:5d}/{:5d} batches | lr {:02.6f} | {:5.2f} ms | loss {:5.5f} | ppl {:8.2f}'.format(epoch, batch_index, len(train_data) // batch_size, scheduler.get_lr()[0],elapsed * 1000 / log_interval, cur_loss, math.exp(cur_loss)))def evaluate(eval_model, data_source):eval_model.eval()total_loss = 0eval_batch_size = 1000with torch.no_grad():for i in range(0, len(data_source) - 1, eval_batch_size):data, targets = get_batch(data_source, i, eval_batch_size)output = eval_model(data)total_loss += len(data[0]) * criterion(output, targets).cpu().item()return total_loss / len(data_source)def plot_and_loss(eval_model, data_source, epoch):eval_model.eval()total_loss = 0.test_result = torch.Tensor(0)truth = torch.Tensor(0)with torch.no_grad():for i in range(0, len(data_source) - 1):data, target = get_batch(data_source, i, 1)output = eval_model(data)total_loss += criterion(output, target).item()test_result = torch.cat((test_result, output[-1].view(-1).cpu()), 0)truth = torch.cat((truth, target[-1].view(-1).cpu()), 0)plt.plot(test_result, color='red')plt.plot(truth, color='blue')plt.grid(True, which='both')plt.axhline(y=0, color='k')plt.savefig('transformer-epoch%d.png' % epoch)plt.close()return total_loss / itrain_data, val_data = get_data() model = TransAm().to(device) criterion = nn.MSELoss() lr = 0.005 optimizer = torch.optim.AdamW(model.parameters(), lr=lr) scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.95) epochs = 60for epoch in range(1, epochs + 1):epoch_start_time = time.time()train(train_data)if(epoch % 10 is 0):val_loss = plot_and_loss(model, val_data, epoch)else:val_loss = evaluate(model, val_data)print('-' * 89)print('| end of epoch {:3d} | time: {:5.2f}s | valid loss {:5.5f} | valid ppl {:8.2f}'.format(epoch, (time.time() - epoch_start_time), val_loss, math.exp(val_loss)))print('-' * 89)scheduler.step()模型使用示例:
保存模型: torch.save(model.state_dict(), 'transformer.pth.tar')使用模型: import osmodel_path = os.path.join(os.getcwd(), 'transformer.pth.tar') model_dict = torch.load(model_path) model.load_state_dict(model_dict)test_data = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1, 0]) test_data = torch.from_numpy(test_data).view(-1, 1, 1).to('cuda').float() # seq_len, batch_size, embedding test_output = model(test_data) print(test_output[-1].item())總結
以上是生活随笔為你收集整理的transformer序列预测示例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 浅谈5G时代下的物联网开发平台
- 下一篇: 一般PNG图片压缩的Java实现