電圧時系列信号の次のステップ電圧値を予測するためにLSTMを使用します。LSTM履歴長対予測誤差
なぜ使用してより長い配列(5またはバック時間で10段階)LSTMが予測を改善し、予測誤差を低減させない訓練する:私は質問がありますか? (実際にはそれを劣化させる - 数字を見ると、sequence_length = 5の結果がsequence_length = 10より良い)
testplot( 'epochs:10'、 'ratio:1'、 'sequence_length:10' 」、 '0.00116802704509')
testplot( 'エポック:10'、 '比:1'、 'sequence_length:5'、 'エラーを意味する:'、 '0.000495359163296'
(緑色の予測信号、実数赤の場合)
import os
import matplotlib.pyplot as plt
import numpy as np
import time
import csv
from keras.layers.core import Dense, Activation, Dropout
from keras.layers.recurrent import LSTM
from keras.models import Sequential
np.random.seed(1234)
def data_power_consumption(path_to_dataset,
sequence_length=50,
ratio=1.0):
max_values = ratio * 2049280
with open(path_to_dataset) as f:
data = csv.reader(f, delimiter=",")
power = []
nb_of_values = 0
for line in data:
try:
power.append(float(line[4]))
nb_of_values += 1
except ValueError:
pass
# 2049280.0 is the total number of valid values, i.e. ratio = 1.0
if nb_of_values >= max_values:
print "max value", nb_of_values
break
print "Data loaded from csv. Formatting..."
result = []
for index in range(len(power) - sequence_length):
result.append(power[index: index + sequence_length])
result = np.array(result) # shape (2049230, 50)
result_mean = result.mean()
result -= result_mean
print "Shift : ", result_mean
print "Data : ", result.shape
row = round(0.9 * result.shape[0])
train = result[:row, :]
np.random.shuffle(train)
X_train = train[:, :-1]
y_train = train[:, -1]
X_test = result[row:, :-1]
y_test = result[row:, -1]
X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1))
return [X_train, y_train, X_test, y_test]
def build_model():
model = Sequential()
layers = [1, 50, 100, 1]
model.add(LSTM(
input_dim=layers[0],
output_dim=layers[1],
return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(
layers[2],
return_sequences=False))
model.add(Dropout(0.2))
model.add(Dense(
output_dim=layers[3]))
model.add(Activation("linear"))
start = time.time()
model.compile(loss="mse", optimizer="adam") # consider adam
print "Compilation Time : ", time.time() - start
return model
def run_network(model=None, data=None):
global_start_time = time.time()
epochs = 10
ratio = 1
sequence_length = 3
path_to_dataset = 'TIMBER_DATA_1.csv'
if data is None:
print 'Loading data... '
X_train, y_train, X_test, y_test = data_power_consumption(
path_to_dataset, sequence_length, ratio)
else:
X_train, y_train, X_test, y_test = data
print '\nData Loaded. Compiling...\n'
if model is None:
model = build_model()
try:
model.fit(
X_train, y_train,
batch_size=512, nb_epoch=epochs, validation_split=0.05)
predicted = model.predict(X_test)
predicted = np.reshape(predicted, (predicted.size,))
print "done"
except KeyboardInterrupt:
print 'Training duration (s) : ', time.time() - global_start_time
return model, y_test, 0
try:
fig, ax = plt.subplots()
txt = "epochs: " + str(epochs), "ratio: " + str(ratio), "sequence_length: " + str(sequence_length)
# calculate error (shift predicted by "sequence_length - 1 and apply mean with abs)
y_test_mean = y_test - np.mean(y_test)
y_test_mean_shifted = y_test_mean[:-1*(sequence_length - 1)]
predicted_mean = predicted - np.mean(predicted)
predicted_mean_shifted = predicted_mean[(sequence_length - 1):]
prediction_error = np.mean(abs(y_test_mean_shifted - predicted_mean_shifted))
text_mean = "mean error: ", str(prediction_error)
txt = txt + text_mean
# Now add the legend with some customizations.
legend = ax.legend(loc='upper center', shadow=True)
ax.plot(y_test_mean_shifted[900:1000], 'r--', label='Real data')
ax.plot(predicted_mean_shifted[900:1000], 'g:', label='Predicted')
fig.text(0.4, 0.2, txt, horizontalalignment='center', verticalalignment='center', transform = ax.transAxes)
plt.savefig(os.path.join('cern_figures', 'testplot' + str(txt) + '.png'))
plt.show()
except Exception as e:
print str(e)
print 'Training duration (s) : ', time.time() - global_start_time
return model, y_test, predicted
# main
if __name__ == "__main__":
_, y_test_out, predicted_out = run_network()
#y_test_out_mean = y_test_out - np.mean(y_test_out)
#predicted_out_mean = predicted_out - np.mean(predicted_out)