循环神经网络RNN、LSTM、GRU实现股票预测
                                                            生活随笔
收集整理的這篇文章主要介紹了
                                循环神经网络RNN、LSTM、GRU实现股票预测
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.                        
                                Tensorflow——循環神經網絡RNN
- 循環核
- TensorFlow描述循環核
 
- 循環神經網絡
- TensorFlow描述循環神經網絡
 
- 循環計算過程
- 輸入一個字母,預測下一個字母
- 輸入四個連續字母,預測下一個字母
 
- Embedding編碼
- TensorFlow描述Embedding編碼
- 用Embedding編碼替換獨熱碼實現輸入一個字符預測
- 用Embedding編碼替換獨熱碼實現輸入四個字符預測
 
 
- 循環神經網絡實現股票預測
- 下載股票預測數據
- 股票預測
 
- 長短記憶網絡LSTM
- TensorFlow描述LSTM層
- LSTM實現股票預測
 
- GRU網絡
- TensorFlow描述GRU層
- LSTM實現股票預測
 
觀看【北京大學】TensorFlow2.0視頻 筆記
https://www.bilibili.com/video/BV1B7411L7Qt?p=25
循環核
循環核具有記憶力,通過不同時刻的參數共享實現了對時間序列的信息提取
 
 多層循環核
 
TensorFlow描述循環核
tf.keras.layers.SimpleRNN( 記憶體個數, activation='', #默認tanh return_sequences=True or False #是否每個時刻都輸出ht到下一層默認true循環神經網絡
借助循環核提取時間特征后,送入全連接網絡
 
 
TensorFlow描述循環神經網絡
tf.keras.layers.SimpleRNN( 記憶體個數, activation='', #默認tanh return_sequences=True or False #是否每個時刻都輸出ht到下一層默認true循環計算過程
輸入一個字母,預測下一個字母
import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dense, SimpleRNN import matplotlib.pyplot as plt import osinput_word = "abcde" w_to_id = {'a': 0, 'b': 1, 'c': 2, 'd': 3, 'e': 4} # 單詞映射到數值id的詞典 id_to_onehot = {0: [1., 0., 0., 0., 0.], 1: [0., 1., 0., 0., 0.], 2: [0., 0., 1., 0., 0.], 3: [0., 0., 0., 1., 0.],4: [0., 0., 0., 0., 1.]} # id編碼為one-hotx_train = [id_to_onehot[w_to_id['a']], id_to_onehot[w_to_id['b']], id_to_onehot[w_to_id['c']],id_to_onehot[w_to_id['d']], id_to_onehot[w_to_id['e']]] y_train = [w_to_id['b'], w_to_id['c'], w_to_id['d'], w_to_id['e'], w_to_id['a']]np.random.seed(7) np.random.shuffle(x_train) np.random.seed(7) np.random.shuffle(y_train) tf.random.set_seed(7)# 使x_train符合SimpleRNN輸入要求:[送入樣本數, 循環核時間展開步數, 每個時間步輸入特征個數]。 # 此處整個數據集送入,送入樣本數為len(x_train);輸入1個字母出結果,循環核時間展開步數為1; 表示為獨熱碼有5個輸入特征,每個時間步輸入特征個數為5 x_train = np.reshape(x_train, (len(x_train), 1, 5)) y_train = np.array(y_train)model = tf.keras.Sequential([SimpleRNN(3),Dense(5, activation='softmax') ])model.compile(optimizer=tf.keras.optimizers.Adam(0.01),loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),metrics=['sparse_categorical_accuracy'])checkpoint_save_path = "./checkpoint/rnn_onehot_1pre1.ckpt"if os.path.exists(checkpoint_save_path + '.index'):print('-------------load the model-----------------')model.load_weights(checkpoint_save_path)cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_save_path,save_weights_only=True,save_best_only=True,monitor='loss') # 由于fit沒有給出測試集,不計算測試集準確率,根據loss,保存最優模型history = model.fit(x_train, y_train, batch_size=32, epochs=100, callbacks=[cp_callback])model.summary()# print(model.trainable_variables) file = open('./weights.txt', 'w') # 參數提取 for v in model.trainable_variables:file.write(str(v.name) + '\n')file.write(str(v.shape) + '\n')file.write(str(v.numpy()) + '\n') file.close()############################################### show ################################################ 顯示訓練集和驗證集的acc和loss曲線 acc = history.history['sparse_categorical_accuracy'] loss = history.history['loss']plt.subplot(1, 2, 1) plt.plot(acc, label='Training Accuracy') plt.title('Training Accuracy') plt.legend()plt.subplot(1, 2, 2) plt.plot(loss, label='Training Loss') plt.title('Training Loss') plt.legend() plt.show()############### predict #############preNum = int(input("input the number of test alphabet:")) for i in range(preNum):alphabet1 = input("input test alphabet:")alphabet = [id_to_onehot[w_to_id[alphabet1]]]# 使alphabet符合SimpleRNN輸入要求:[送入樣本數, 循環核時間展開步數, 每個時間步輸入特征個數]。此處驗證效果送入了1個樣本,送入樣本數為1;輸入1個字母出結果,所以循環核時間展開步數為1; 表示為獨熱碼有5個輸入特征,每個時間步輸入特征個數為5alphabet = np.reshape(alphabet, (1, 1, 5))result = model.predict([alphabet])pred = tf.argmax(result, axis=1)pred = int(pred)tf.print(alphabet1 + '->' + input_word[pred])輸入四個連續字母,預測下一個字母
import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dense, SimpleRNN import matplotlib.pyplot as plt import osinput_word = "abcde" w_to_id = {'a': 0, 'b': 1, 'c': 2, 'd': 3, 'e': 4} # 單詞映射到數值id的詞典 id_to_onehot = {0: [1., 0., 0., 0., 0.], 1: [0., 1., 0., 0., 0.], 2: [0., 0., 1., 0., 0.], 3: [0., 0., 0., 1., 0.],4: [0., 0., 0., 0., 1.]} # id編碼為one-hotx_train = [[id_to_onehot[w_to_id['a']], id_to_onehot[w_to_id['b']], id_to_onehot[w_to_id['c']], id_to_onehot[w_to_id['d']]],[id_to_onehot[w_to_id['b']], id_to_onehot[w_to_id['c']], id_to_onehot[w_to_id['d']], id_to_onehot[w_to_id['e']]],[id_to_onehot[w_to_id['c']], id_to_onehot[w_to_id['d']], id_to_onehot[w_to_id['e']], id_to_onehot[w_to_id['a']]],[id_to_onehot[w_to_id['d']], id_to_onehot[w_to_id['e']], id_to_onehot[w_to_id['a']], id_to_onehot[w_to_id['b']]],[id_to_onehot[w_to_id['e']], id_to_onehot[w_to_id['a']], id_to_onehot[w_to_id['b']], id_to_onehot[w_to_id['c']]], ] y_train = [w_to_id['e'], w_to_id['a'], w_to_id['b'], w_to_id['c'], w_to_id['d']]np.random.seed(7) np.random.shuffle(x_train) np.random.seed(7) np.random.shuffle(y_train) tf.random.set_seed(7)# 使x_train符合SimpleRNN輸入要求:[送入樣本數, 循環核時間展開步數, 每個時間步輸入特征個數]。 # 此處整個數據集送入,送入樣本數為len(x_train);輸入4個字母出結果,循環核時間展開步數為4; 表示為獨熱碼有5個輸入特征,每個時間步輸入特征個數為5 x_train = np.reshape(x_train, (len(x_train), 4, 5)) y_train = np.array(y_train)model = tf.keras.Sequential([SimpleRNN(3),Dense(5, activation='softmax') ])model.compile(optimizer=tf.keras.optimizers.Adam(0.01),loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),metrics=['sparse_categorical_accuracy'])checkpoint_save_path = "./checkpoint/rnn_onehot_4pre1.ckpt"if os.path.exists(checkpoint_save_path + '.index'):print('-------------load the model-----------------')model.load_weights(checkpoint_save_path)cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_save_path,save_weights_only=True,save_best_only=True,monitor='loss') # 由于fit沒有給出測試集,不計算測試集準確率,根據loss,保存最優模型history = model.fit(x_train, y_train, batch_size=32, epochs=100, callbacks=[cp_callback])model.summary()# print(model.trainable_variables) file = open('./weights.txt', 'w') # 參數提取 for v in model.trainable_variables:file.write(str(v.name) + '\n')file.write(str(v.shape) + '\n')file.write(str(v.numpy()) + '\n') file.close()############################################### show ################################################ 顯示訓練集和驗證集的acc和loss曲線 acc = history.history['sparse_categorical_accuracy'] loss = history.history['loss']plt.subplot(1, 2, 1) plt.plot(acc, label='Training Accuracy') plt.title('Training Accuracy') plt.legend()plt.subplot(1, 2, 2) plt.plot(loss, label='Training Loss') plt.title('Training Loss') plt.legend() plt.show()############### predict #############preNum = int(input("input the number of test alphabet:")) for i in range(preNum):alphabet1 = input("input test alphabet:")alphabet = [id_to_onehot[w_to_id[a]] for a in alphabet1]# 使alphabet符合SimpleRNN輸入要求:[送入樣本數, 循環核時間展開步數, 每個時間步輸入特征個數]。此處驗證效果送入了1個樣本,送入樣本數為1;輸入4個字母出結果,所以循環核時間展開步數為4; 表示為獨熱碼有5個輸入特征,每個時間步輸入特征個數為5alphabet = np.reshape(alphabet, (1, 4, 5))result = model.predict([alphabet])pred = tf.argmax(result, axis=1)pred = int(pred)tf.print(alphabet1 + '->' + input_word[pred])Embedding編碼
Embedding是一種單詞編碼,用低維向量實現了編碼,這種編碼通過神經網絡訓練優化,能表達出單詞間的相關性
TensorFlow描述Embedding編碼
tf.keras.layers.Embedding(詞匯表大小,編碼維度) #編碼維度即用幾個數字表達一個單詞用Embedding編碼替換獨熱碼實現輸入一個字符預測
import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dense, SimpleRNN, Embedding import matplotlib.pyplot as plt import osinput_word = "abcde" w_to_id = {'a': 0, 'b': 1, 'c': 2, 'd': 3, 'e': 4} # 單詞映射到數值id的詞典x_train = [w_to_id['a'], w_to_id['b'], w_to_id['c'], w_to_id['d'], w_to_id['e']] y_train = [w_to_id['b'], w_to_id['c'], w_to_id['d'], w_to_id['e'], w_to_id['a']]np.random.seed(7) np.random.shuffle(x_train) np.random.seed(7) np.random.shuffle(y_train) tf.random.set_seed(7)# 使x_train符合Embedding輸入要求:[送入樣本數, 循環核時間展開步數] , # 此處整個數據集送入所以送入,送入樣本數為len(x_train);輸入1個字母出結果,循環核時間展開步數為1。 x_train = np.reshape(x_train, (len(x_train), 1)) y_train = np.array(y_train)model = tf.keras.Sequential([Embedding(5, 2),SimpleRNN(3),Dense(5, activation='softmax') ])model.compile(optimizer=tf.keras.optimizers.Adam(0.01),loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),metrics=['sparse_categorical_accuracy'])checkpoint_save_path = "./checkpoint/run_embedding_1pre1.ckpt"if os.path.exists(checkpoint_save_path + '.index'):print('-------------load the model-----------------')model.load_weights(checkpoint_save_path)cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_save_path,save_weights_only=True,save_best_only=True,monitor='loss') # 由于fit沒有給出測試集,不計算測試集準確率,根據loss,保存最優模型history = model.fit(x_train, y_train, batch_size=32, epochs=100, callbacks=[cp_callback])model.summary()# print(model.trainable_variables) file = open('./weights.txt', 'w') # 參數提取 for v in model.trainable_variables:file.write(str(v.name) + '\n')file.write(str(v.shape) + '\n')file.write(str(v.numpy()) + '\n') file.close()############################################### show ################################################ 顯示訓練集和驗證集的acc和loss曲線 acc = history.history['sparse_categorical_accuracy'] loss = history.history['loss']plt.subplot(1, 2, 1) plt.plot(acc, label='Training Accuracy') plt.title('Training Accuracy') plt.legend()plt.subplot(1, 2, 2) plt.plot(loss, label='Training Loss') plt.title('Training Loss') plt.legend() plt.show()############### predict #############preNum = int(input("input the number of test alphabet:")) for i in range(preNum):alphabet1 = input("input test alphabet:")alphabet = [w_to_id[alphabet1]]# 使alphabet符合Embedding輸入要求:[送入樣本數, 循環核時間展開步數]。# 此處驗證效果送入了1個樣本,送入樣本數為1;輸入1個字母出結果,循環核時間展開步數為1。alphabet = np.reshape(alphabet, (1, 1))result = model.predict(alphabet)pred = tf.argmax(result, axis=1)pred = int(pred)tf.print(alphabet1 + '->' + input_word[pred])用Embedding編碼替換獨熱碼實現輸入四個字符預測
import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dense, SimpleRNN, Embedding import matplotlib.pyplot as plt import osinput_word = "abcdefghijklmnopqrstuvwxyz" w_to_id = {'a': 0, 'b': 1, 'c': 2, 'd': 3, 'e': 4,'f': 5, 'g': 6, 'h': 7, 'i': 8, 'j': 9,'k': 10, 'l': 11, 'm': 12, 'n': 13, 'o': 14,'p': 15, 'q': 16, 'r': 17, 's': 18, 't': 19,'u': 20, 'v': 21, 'w': 22, 'x': 23, 'y': 24, 'z': 25} # 單詞映射到數值id的詞典training_set_scaled = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,11, 12, 13, 14, 15, 16, 17, 18, 19, 20,21, 22, 23, 24, 25]x_train = [] y_train = []for i in range(4, 26):x_train.append(training_set_scaled[i - 4:i])y_train.append(training_set_scaled[i])np.random.seed(7) np.random.shuffle(x_train) np.random.seed(7) np.random.shuffle(y_train) tf.random.set_seed(7)# 使x_train符合Embedding輸入要求:[送入樣本數, 循環核時間展開步數] , # 此處整個數據集送入所以送入,送入樣本數為len(x_train);輸入4個字母出結果,循環核時間展開步數為4。 x_train = np.reshape(x_train, (len(x_train), 4)) y_train = np.array(y_train)model = tf.keras.Sequential([Embedding(26, 2),SimpleRNN(10),Dense(26, activation='softmax') ])model.compile(optimizer=tf.keras.optimizers.Adam(0.01),loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),metrics=['sparse_categorical_accuracy'])checkpoint_save_path = "./checkpoint/rnn_embedding_4pre1.ckpt"if os.path.exists(checkpoint_save_path + '.index'):print('-------------load the model-----------------')model.load_weights(checkpoint_save_path)cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_save_path,save_weights_only=True,save_best_only=True,monitor='loss') # 由于fit沒有給出測試集,不計算測試集準確率,根據loss,保存最優模型history = model.fit(x_train, y_train, batch_size=32, epochs=100, callbacks=[cp_callback])model.summary()file = open('./weights.txt', 'w') # 參數提取 for v in model.trainable_variables:file.write(str(v.name) + '\n')file.write(str(v.shape) + '\n')file.write(str(v.numpy()) + '\n') file.close()############################################### show ################################################ 顯示訓練集和驗證集的acc和loss曲線 acc = history.history['sparse_categorical_accuracy'] loss = history.history['loss']plt.subplot(1, 2, 1) plt.plot(acc, label='Training Accuracy') plt.title('Training Accuracy') plt.legend()plt.subplot(1, 2, 2) plt.plot(loss, label='Training Loss') plt.title('Training Loss') plt.legend() plt.show()################# predict ##################preNum = int(input("input the number of test alphabet:")) for i in range(preNum):alphabet1 = input("input test alphabet:")alphabet = [w_to_id[a] for a in alphabet1]# 使alphabet符合Embedding輸入要求:[送入樣本數, 時間展開步數]。# 此處驗證效果送入了1個樣本,送入樣本數為1;輸入4個字母出結果,循環核時間展開步數為4。alphabet = np.reshape(alphabet, (1, 4))result = model.predict([alphabet])pred = tf.argmax(result, axis=1)pred = int(pred)tf.print(alphabet1 + '->' + input_word[pred])循環神經網絡實現股票預測
下載股票預測數據
import tushare as ts import matplotlib.pyplot as pltdf1 = ts.get_k_data('600519', ktype='D', start='2010-04-26', end='2020-04-26')datapath1 = "./SH600519.csv" df1.to_csv(datapath1)股票預測
import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dropout, Dense, SimpleRNN import matplotlib.pyplot as plt import os import pandas as pd from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error, mean_absolute_error import mathmaotai = pd.read_csv('./SH600519.csv') # 讀取股票文件training_set = maotai.iloc[0:2426 - 300, 2:3].values # 前(2426-300=2126)天的開盤價作為訓練集,表格從0開始計數,2:3 是提取[2:3)列,前閉后開,故提取出C列開盤價 test_set = maotai.iloc[2426 - 300:, 2:3].values # 后300天的開盤價作為測試集# 歸一化 sc = MinMaxScaler(feature_range=(0, 1)) # 定義歸一化:歸一化到(0,1)之間 training_set_scaled = sc.fit_transform(training_set) # 求得訓練集的最大值,最小值這些訓練集固有的屬性,并在訓練集上進行歸一化 test_set = sc.transform(test_set) # 利用訓練集的屬性對測試集進行歸一化x_train = [] y_train = []x_test = [] y_test = []# 測試集:csv表格中前2426-300=2126天數據 # 利用for循環,遍歷整個訓練集,提取訓練集中連續60天的開盤價作為輸入特征x_train,第61天的數據作為標簽,for循環共構建2426-300-60=2066組數據。 for i in range(60, len(training_set_scaled)):x_train.append(training_set_scaled[i - 60:i, 0])y_train.append(training_set_scaled[i, 0]) # 對訓練集進行打亂 np.random.seed(7) np.random.shuffle(x_train) np.random.seed(7) np.random.shuffle(y_train) tf.random.set_seed(7) # 將訓練集由list格式變為array格式 x_train, y_train = np.array(x_train), np.array(y_train)# 使x_train符合RNN輸入要求:[送入樣本數, 循環核時間展開步數, 每個時間步輸入特征個數]。 # 此處整個數據集送入,送入樣本數為x_train.shape[0]即2066組數據;輸入60個開盤價,預測出第61天的開盤價,循環核時間展開步數為60; 每個時間步送入的特征是某一天的開盤價,只有1個數據,故每個時間步輸入特征個數為1 x_train = np.reshape(x_train, (x_train.shape[0], 60, 1)) # 測試集:csv表格中后300天數據 # 利用for循環,遍歷整個測試集,提取測試集中連續60天的開盤價作為輸入特征x_train,第61天的數據作為標簽,for循環共構建300-60=240組數據。 for i in range(60, len(test_set)):x_test.append(test_set[i - 60:i, 0])y_test.append(test_set[i, 0]) # 測試集變array并reshape為符合RNN輸入要求:[送入樣本數, 循環核時間展開步數, 每個時間步輸入特征個數] x_test, y_test = np.array(x_test), np.array(y_test) x_test = np.reshape(x_test, (x_test.shape[0], 60, 1))model = tf.keras.Sequential([SimpleRNN(80, return_sequences=True),Dropout(0.2),SimpleRNN(100),Dropout(0.2),Dense(1) ])model.compile(optimizer=tf.keras.optimizers.Adam(0.001),loss='mean_squared_error') # 損失函數用均方誤差 # 該應用只觀測loss數值,不觀測準確率,所以刪去metrics選項,一會在每個epoch迭代顯示時只顯示loss值checkpoint_save_path = "./checkpoint/rnn_stock.ckpt"if os.path.exists(checkpoint_save_path + '.index'):print('-------------load the model-----------------')model.load_weights(checkpoint_save_path)cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_save_path,save_weights_only=True,save_best_only=True,monitor='val_loss')history = model.fit(x_train, y_train, batch_size=64, epochs=50, validation_data=(x_test, y_test), validation_freq=1,callbacks=[cp_callback])model.summary()file = open('./weights.txt', 'w') # 參數提取 for v in model.trainable_variables:file.write(str(v.name) + '\n')file.write(str(v.shape) + '\n')file.write(str(v.numpy()) + '\n') file.close()loss = history.history['loss'] val_loss = history.history['val_loss']plt.plot(loss, label='Training Loss') plt.plot(val_loss, label='Validation Loss') plt.title('Training and Validation Loss') plt.legend() plt.show()################## predict ###################### # 測試集輸入模型進行預測 predicted_stock_price = model.predict(x_test) # 對預測數據還原---從(0,1)反歸一化到原始范圍 predicted_stock_price = sc.inverse_transform(predicted_stock_price) # 對真實數據還原---從(0,1)反歸一化到原始范圍 real_stock_price = sc.inverse_transform(test_set[60:]) # 畫出真實數據和預測數據的對比曲線 plt.plot(real_stock_price, color='red', label='MaoTai Stock Price') plt.plot(predicted_stock_price, color='blue', label='Predicted MaoTai Stock Price') plt.title('MaoTai Stock Price Prediction') plt.xlabel('Time') plt.ylabel('MaoTai Stock Price') plt.legend() plt.show()##########evaluate############## # calculate MSE 均方誤差 ---> E[(預測值-真實值)^2] (預測值減真實值求平方后求均值) mse = mean_squared_error(predicted_stock_price, real_stock_price) # calculate RMSE 均方根誤差--->sqrt[MSE] (對均方誤差開方) rmse = math.sqrt(mean_squared_error(predicted_stock_price, real_stock_price)) # calculate MAE 平均絕對誤差----->E[|預測值-真實值|](預測值減真實值求絕對值后求均值) mae = mean_absolute_error(predicted_stock_price, real_stock_price) print('均方誤差: %.6f' % mse) print('均方根誤差: %.6f' % rmse) print('平均絕對誤差: %.6f' % mae)長短記憶網絡LSTM
通過門控單元改善了RNN長期依賴問題
 
TensorFlow描述LSTM層
tf.keras.layers.LSTM(記憶體個數, return_sequences=True or False #是否返回輸出默認false僅最后輸出 true各時間步輸出 )LSTM實現股票預測
import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dropout, Dense, LSTM import matplotlib.pyplot as plt import os import pandas as pd from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error, mean_absolute_error import mathmaotai = pd.read_csv('./SH600519.csv') # 讀取股票文件training_set = maotai.iloc[0:2426 - 300, 2:3].values # 前(2426-300=2126)天的開盤價作為訓練集,表格從0開始計數,2:3 是提取[2:3)列,前閉后開,故提取出C列開盤價 test_set = maotai.iloc[2426 - 300:, 2:3].values # 后300天的開盤價作為測試集# 歸一化 sc = MinMaxScaler(feature_range=(0, 1)) # 定義歸一化:歸一化到(0,1)之間 training_set_scaled = sc.fit_transform(training_set) # 求得訓練集的最大值,最小值這些訓練集固有的屬性,并在訓練集上進行歸一化 test_set = sc.transform(test_set) # 利用訓練集的屬性對測試集進行歸一化x_train = [] y_train = []x_test = [] y_test = []# 測試集:csv表格中前2426-300=2126天數據 # 利用for循環,遍歷整個訓練集,提取訓練集中連續60天的開盤價作為輸入特征x_train,第61天的數據作為標簽,for循環共構建2426-300-60=2066組數據。 for i in range(60, len(training_set_scaled)):x_train.append(training_set_scaled[i - 60:i, 0])y_train.append(training_set_scaled[i, 0]) # 對訓練集進行打亂 np.random.seed(7) np.random.shuffle(x_train) np.random.seed(7) np.random.shuffle(y_train) tf.random.set_seed(7) # 將訓練集由list格式變為array格式 x_train, y_train = np.array(x_train), np.array(y_train)# 使x_train符合RNN輸入要求:[送入樣本數, 循環核時間展開步數, 每個時間步輸入特征個數]。 # 此處整個數據集送入,送入樣本數為x_train.shape[0]即2066組數據;輸入60個開盤價,預測出第61天的開盤價,循環核時間展開步數為60; 每個時間步送入的特征是某一天的開盤價,只有1個數據,故每個時間步輸入特征個數為1 x_train = np.reshape(x_train, (x_train.shape[0], 60, 1)) # 測試集:csv表格中后300天數據 # 利用for循環,遍歷整個測試集,提取測試集中連續60天的開盤價作為輸入特征x_train,第61天的數據作為標簽,for循環共構建300-60=240組數據。 for i in range(60, len(test_set)):x_test.append(test_set[i - 60:i, 0])y_test.append(test_set[i, 0]) # 測試集變array并reshape為符合RNN輸入要求:[送入樣本數, 循環核時間展開步數, 每個時間步輸入特征個數] x_test, y_test = np.array(x_test), np.array(y_test) x_test = np.reshape(x_test, (x_test.shape[0], 60, 1))model = tf.keras.Sequential([LSTM(80, return_sequences=True),Dropout(0.2),LSTM(100),Dropout(0.2),Dense(1) ])model.compile(optimizer=tf.keras.optimizers.Adam(0.001),loss='mean_squared_error') # 損失函數用均方誤差 # 該應用只觀測loss數值,不觀測準確率,所以刪去metrics選項,一會在每個epoch迭代顯示時只顯示loss值checkpoint_save_path = "./checkpoint/LSTM_stock.ckpt"if os.path.exists(checkpoint_save_path + '.index'):print('-------------load the model-----------------')model.load_weights(checkpoint_save_path)cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_save_path,save_weights_only=True,save_best_only=True,monitor='val_loss')history = model.fit(x_train, y_train, batch_size=64, epochs=50, validation_data=(x_test, y_test), validation_freq=1,callbacks=[cp_callback])model.summary()file = open('./weights.txt', 'w') # 參數提取 for v in model.trainable_variables:file.write(str(v.name) + '\n')file.write(str(v.shape) + '\n')file.write(str(v.numpy()) + '\n') file.close()loss = history.history['loss'] val_loss = history.history['val_loss']plt.plot(loss, label='Training Loss') plt.plot(val_loss, label='Validation Loss') plt.title('Training and Validation Loss') plt.legend() plt.show()################## predict ###################### # 測試集輸入模型進行預測 predicted_stock_price = model.predict(x_test) # 對預測數據還原---從(0,1)反歸一化到原始范圍 predicted_stock_price = sc.inverse_transform(predicted_stock_price) # 對真實數據還原---從(0,1)反歸一化到原始范圍 real_stock_price = sc.inverse_transform(test_set[60:]) # 畫出真實數據和預測數據的對比曲線 plt.plot(real_stock_price, color='red', label='MaoTai Stock Price') plt.plot(predicted_stock_price, color='blue', label='Predicted MaoTai Stock Price') plt.title('MaoTai Stock Price Prediction') plt.xlabel('Time') plt.ylabel('MaoTai Stock Price') plt.legend() plt.show()##########evaluate############## # calculate MSE 均方誤差 ---> E[(預測值-真實值)^2] (預測值減真實值求平方后求均值) mse = mean_squared_error(predicted_stock_price, real_stock_price) # calculate RMSE 均方根誤差--->sqrt[MSE] (對均方誤差開方) rmse = math.sqrt(mean_squared_error(predicted_stock_price, real_stock_price)) # calculate MAE 平均絕對誤差----->E[|預測值-真實值|](預測值減真實值求絕對值后求均值) mae = mean_absolute_error(predicted_stock_price, real_stock_price) print('均方誤差: %.6f' % mse) print('均方根誤差: %.6f' % rmse) print('平均絕對誤差: %.6f' % mae)GRU網絡
使記憶體ht融合了長期記憶和短期記憶
TensorFlow描述GRU層
tf.keras.layers.GRU(記憶體個數, return_sequences=True or False #是否返回輸出默認false僅最后輸出 true各時間步輸出 )LSTM實現股票預測
import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dropout, Dense, GRU import matplotlib.pyplot as plt import os import pandas as pd from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error, mean_absolute_error import mathmaotai = pd.read_csv('./SH600519.csv') # 讀取股票文件training_set = maotai.iloc[0:2426 - 300, 2:3].values # 前(2426-300=2126)天的開盤價作為訓練集,表格從0開始計數,2:3 是提取[2:3)列,前閉后開,故提取出C列開盤價 test_set = maotai.iloc[2426 - 300:, 2:3].values # 后300天的開盤價作為測試集# 歸一化 sc = MinMaxScaler(feature_range=(0, 1)) # 定義歸一化:歸一化到(0,1)之間 training_set_scaled = sc.fit_transform(training_set) # 求得訓練集的最大值,最小值這些訓練集固有的屬性,并在訓練集上進行歸一化 test_set = sc.transform(test_set) # 利用訓練集的屬性對測試集進行歸一化x_train = [] y_train = []x_test = [] y_test = []# 測試集:csv表格中前2426-300=2126天數據 # 利用for循環,遍歷整個訓練集,提取訓練集中連續60天的開盤價作為輸入特征x_train,第61天的數據作為標簽,for循環共構建2426-300-60=2066組數據。 for i in range(60, len(training_set_scaled)):x_train.append(training_set_scaled[i - 60:i, 0])y_train.append(training_set_scaled[i, 0]) # 對訓練集進行打亂 np.random.seed(7) np.random.shuffle(x_train) np.random.seed(7) np.random.shuffle(y_train) tf.random.set_seed(7) # 將訓練集由list格式變為array格式 x_train, y_train = np.array(x_train), np.array(y_train)# 使x_train符合RNN輸入要求:[送入樣本數, 循環核時間展開步數, 每個時間步輸入特征個數]。 # 此處整個數據集送入,送入樣本數為x_train.shape[0]即2066組數據;輸入60個開盤價,預測出第61天的開盤價,循環核時間展開步數為60; 每個時間步送入的特征是某一天的開盤價,只有1個數據,故每個時間步輸入特征個數為1 x_train = np.reshape(x_train, (x_train.shape[0], 60, 1)) # 測試集:csv表格中后300天數據 # 利用for循環,遍歷整個測試集,提取測試集中連續60天的開盤價作為輸入特征x_train,第61天的數據作為標簽,for循環共構建300-60=240組數據。 for i in range(60, len(test_set)):x_test.append(test_set[i - 60:i, 0])y_test.append(test_set[i, 0]) # 測試集變array并reshape為符合RNN輸入要求:[送入樣本數, 循環核時間展開步數, 每個時間步輸入特征個數] x_test, y_test = np.array(x_test), np.array(y_test) x_test = np.reshape(x_test, (x_test.shape[0], 60, 1))model = tf.keras.Sequential([GRU(80, return_sequences=True),Dropout(0.2),GRU(100),Dropout(0.2),Dense(1) ])model.compile(optimizer=tf.keras.optimizers.Adam(0.001),loss='mean_squared_error') # 損失函數用均方誤差 # 該應用只觀測loss數值,不觀測準確率,所以刪去metrics選項,一會在每個epoch迭代顯示時只顯示loss值checkpoint_save_path = "./checkpoint/stock.ckpt"if os.path.exists(checkpoint_save_path + '.index'):print('-------------load the model-----------------')model.load_weights(checkpoint_save_path)cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_save_path,save_weights_only=True,save_best_only=True,monitor='val_loss')history = model.fit(x_train, y_train, batch_size=64, epochs=50, validation_data=(x_test, y_test), validation_freq=1,callbacks=[cp_callback])model.summary()file = open('./weights.txt', 'w') # 參數提取 for v in model.trainable_variables:file.write(str(v.name) + '\n')file.write(str(v.shape) + '\n')file.write(str(v.numpy()) + '\n') file.close()loss = history.history['loss'] val_loss = history.history['val_loss']plt.plot(loss, label='Training Loss') plt.plot(val_loss, label='Validation Loss') plt.title('Training and Validation Loss') plt.legend() plt.show()################## predict ###################### # 測試集輸入模型進行預測 predicted_stock_price = model.predict(x_test) # 對預測數據還原---從(0,1)反歸一化到原始范圍 predicted_stock_price = sc.inverse_transform(predicted_stock_price) # 對真實數據還原---從(0,1)反歸一化到原始范圍 real_stock_price = sc.inverse_transform(test_set[60:]) # 畫出真實數據和預測數據的對比曲線 plt.plot(real_stock_price, color='red', label='MaoTai Stock Price') plt.plot(predicted_stock_price, color='blue', label='Predicted MaoTai Stock Price') plt.title('MaoTai Stock Price Prediction') plt.xlabel('Time') plt.ylabel('MaoTai Stock Price') plt.legend() plt.show()##########evaluate############## # calculate MSE 均方誤差 ---> E[(預測值-真實值)^2] (預測值減真實值求平方后求均值) mse = mean_squared_error(predicted_stock_price, real_stock_price) # calculate RMSE 均方根誤差--->sqrt[MSE] (對均方誤差開方) rmse = math.sqrt(mean_squared_error(predicted_stock_price, real_stock_price)) # calculate MAE 平均絕對誤差----->E[|預測值-真實值|](預測值減真實值求絕對值后求均值) mae = mean_absolute_error(predicted_stock_price, real_stock_price) print('均方誤差: %.6f' % mse) print('均方根誤差: %.6f' % rmse) print('平均絕對誤差: %.6f' % mae)總結
以上是生活随笔為你收集整理的循环神经网络RNN、LSTM、GRU实现股票预测的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Apk瘦身压缩体验
- 下一篇: Python-cdo学习
