본문 바로가기
Naver Clould with BitCamp/Aartificial Intelligence

LSTM, Bidirectional, Conv1D

by HJ0216 2023. 1. 30.

기본 환경: IDE: VS code, Language: Python

 

다양한 dataset에 따른 LSTM Bidirectional, Conv1D 활용

 

1. LSTM(Long Short Term Memory)

: RNN Model의 장기 의존성 문제를 보완하기 위해 등장한 모델

# lstm_boston.py

import numpy as np

from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Dense, SimpleRNN, LSTM, Dropout, Conv2D, Flatten, MaxPooling2D
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, r2_score


# 1. Data
dataset = load_boston()

x = dataset.data # for training
y = dataset.target # for predict

x_train, x_test, y_train, y_test = train_test_split(
    x, y,
    train_size=0.7,
    random_state=123
)

scaler = StandardScaler()
scaler.fit(x_train)
x_train = scaler.transform(x_train)
x_test = scaler.transform(x_test)

print(x_train.shape, x_test.shape) # (354, 13), (152, 13)

x_train = x_train.reshape(354, 13, 1)
x_test = x_test.reshape(152, 13, 1)
# reshape 시, timesteps*feature가 유지되도록 reshape


# 2. Model Construction
model = Sequential()
model.add(LSTM(units=64, input_shape=(13,1)))
model.add(Dense(32, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(16, activation='relu'))
model.add(Dense(8, activation='relu'))
model.add(Dense(1))


# 3. Compile and Training
model.compile(loss='mse', optimizer='adam')

earlyStopping = EarlyStopping(monitor='loss', mode='min', patience=32, restore_best_weights=True, verbose=1)

model.fit(x, y, epochs=2, callbacks=[earlyStopping], batch_size=2)


# 4. Evaluation and Prediction
loss = model.evaluate(x_test,y_test)
print("Loss: ", loss)
y_predict = model.predict(x_test)

r2 = r2_score(y_test, y_predict)
print("R2: ", r2)



'''
Result
RMSE:  21.002980488736824
R2:  -4.457572735948721

'''

 

# lstm_fatch_covtype.py

import numpy as np

from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Dense, SimpleRNN, LSTM, Dropout, Conv2D, Flatten, MaxPooling2D
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

from sklearn.datasets import fetch_covtype
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import accuracy_score, mean_squared_error, r2_score


# 1. Data
dataset = fetch_covtype()
x = dataset.data # for training
y = dataset.target # for predict

y = to_categorical(y)
y = np.delete(y, 0, axis=1)

x_train, x_test, y_train, y_test = train_test_split(
    x, y,
    train_size=0.7,
    random_state=123
)

scaler = StandardScaler()
scaler.fit(x_train)
x_train = scaler.transform(x_train)
x_test = scaler.transform(x_test)

print(x_train.shape, x_test.shape)

x_train = x_train.reshape(406708, 54, 1)
x_test = x_test.reshape(174304, 54, 1)


# 2. Model Construction
model = Sequential()
model.add(LSTM(units=64, input_shape=(54,1)))
model.add(Dense(32, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(16, activation='relu'))
model.add(Dense(8, activation='relu'))
model.add(Dense(7, activation='softmax'))


# 3. Compile and Training
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

earlyStopping = EarlyStopping(monitor='loss', mode='min', patience=32, restore_best_weights=True, verbose=1)

model.fit(x, y, epochs=2, callbacks=[earlyStopping], batch_size=128)


# 4. Evaluation and Prediction
loss, accuracy = model.evaluate(x_test, y_test)
print("loss: ", loss)
print("accuracy: ", accuracy)

y_predict = model.predict(x_test)
y_predict = np.argmax(y_predict, axis=1) # (116203, 7) -> (116203, )
y_test = np.argmax(y_test, axis=1) # (116203, 7) -> (116203,)
# data(y): one hot encoding -> shape: (data_num, class)



'''
Result
loss:  0.062044188380241394
acc:  0.9850000143051147
R2:  0.972367969950624

'''
# lstm_mnist.py

import numpy as np

from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Dense, SimpleRNN, LSTM, Dropout, Conv2D, Flatten, MaxPooling2D
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.datasets import mnist, cifar10, cifar100, fashion_mnist

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, r2_score


# 1. Data
(x_train, y_train), (x_test, y_test) = mnist.load_data()

print(x_train.shape, x_test.shape) # (60000, 28, 28) (10000, 28, 28)

x_train=x_train/255.
x_test=x_test/255.

'''
print(np.unique(y_train, return_counts=True))

(array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=uint8),
array([5923, 6742, 5958, 6131, 5842, 5421, 5918, 6265, 5851, 5949], dtype=int64))
'''


# 2. Model Construction
model = Sequential()
model.add(LSTM(units=64, input_shape=(28,28)))
model.add(Dense(32, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dropout(0.2))
model.add(Dense(16, activation='relu'))
model.add(Dense(10, activation='softmax')) # 다중 분류


# 3. Compile and Training
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['acc'])
# 다중분류, one hot encoding X -> sparse categorical crossentropy

earlyStopping = EarlyStopping(monitor='val_loss', mode='min', patience=32, restore_best_weights=True, verbose=1)

model.fit(x_train, y_train,
          validation_split=0.2,
          epochs=2,
          callbacks=[earlyStopping],
          batch_size=512)


# 4. Evaluation and Prediction
result = model.evaluate(x_test, y_test)
print("loss: ", result[0])
print("acc: ", result[1])

y_predict = model.predict(x_test)
y_predict = np.argmax(y_predict, axis=1)

r2 = r2_score(y_test, y_predict)
print("R2: ", r2)



'''
Result
loss:  0.07328376173973083
acc:  0.9812999963760376
R2:  0.9632089716433644

'''

 

 

2. Bidirectional RNN

: Sequential 모델은 '지금까지 주어진 것을 보고 다음을 예측'하는 모델이었으나 성능 향상을 위해 '앞으로 주어질 것까지 보고 어떠한 것을 예측'하는 모델을 고안하여 Bidirectional RNN 탄생

 I am ? student.

'I am'으로 ?를 추측하는 것보다 'I am'과 'student.'을 함께 보고 예측하는 것이 더 좋은 성과를 나타낼 수 있음

# biDirectional.py

import numpy as np

from sklearn.model_selection import train_test_split

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten, SimpleRNN, LSTM, GRU, Bidirectional # 양방향 연산
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint


# 1. Data
a = np.array(range(1, 101))
x_predict = np.array(range(96, 106))
# 모두 x 데이터이므로 y 데이터를 split 할 필요 X

timesteps = 5 # x: 4개, y: 1개

def split_x(dataset, timesteps):
    data_list = [] # 빈 list 생성
    for i in range(len(dataset) - timesteps + 1):
        # for i in range(3->range(3): 0, 1, 2), range(4->2), range(5->1) : 반환하는 리스트 개수
        subset = dataset[i: (i+timesteps)]
        # dataset[0(이상):5(미만)] [1:6] [2:7]: dataset 위치에 있는 값 반환
        data_list.append(subset)
    return np.array(data_list)

a_split = split_x(a, timesteps)
x_pred_split = split_x(x_predict, timesteps-1)
'''
# timesteps의 변수를 timesteps1, timesteps2로 나눠서 사용할 수 있음
timesteps1 = 5
timesteps2 = 4

a_split = split_x(a, timesteps1) # 5 적용
x_pred_split = split_x(x_predict, timesteps2) # 4 적용

'''

x = a_split[:, :-1] # 모든 행, 시작 ~ -1번째 열
y = a_split[:, -1] # 모든 행, -1번째 열(시작: 0번째 열)
x_predict = x_pred_split[:,:]


'''
print(x, y) # (96, 4) (1, 96)
x: [1 2 3 4] ... [96 97 98 99]
y: [5 6 ... 99 100]

print(x_predict) # (7, 4)
[[ 96  97  98  99]
 [ 97  98  99 100]
 [ 98  99 100 101]
 [ 99 100 101 102]
 [100 101 102 103]
 [101 102 103 104]
 [102 103 104 105]]
'''

x_train, x_test, y_train, y_test = train_test_split(
    x, y,
    test_size=0.2,
    shuffle= True,
    random_state = 333
)

print(x_train.shape, x_test.shape, x_predict.shape) # (76, 4) (20, 4) (7, 4)

x_train = x_train.reshape(76,4,1)
x_test = x_test.reshape(20,4,1)
x_predict = x_predict.reshape(7,4,1)
# train_test_split(): 3차원 이상 작업 불가하므로 split 후 reshape
# x_train, x_test, y_train, y_test = train_test_split()


# 2. Model Construction
model = Sequential()
model.add(Bidirectional(LSTM(units=32, return_sequences=True), input_shape=(4,1)))
# Bidirection은 모델이 아니므로 모델 선택 필요
# return_sequences: output_dim을 input_dim과 동일하게 유지하는 parameter
# RNN model에 들어갈 데이터가 시계열 데이터가 아닐 경우, 성능 저하가 있을 수 있음
# Birectional(LSRM) 후, 시계열 데이터가 반환되는 것이 아니라면 RNN을 연달아 사용하여 반드시 성능이 좋아진다고 할 수 없음
model.add(GRU(32, activation='relu'))
model.add(Flatten())
model.add(Dense(16, activation='relu'))
model.add(Dense(8, activation='relu'))
model.add(Dense(1))
model.summary()
# Bidirection: Non-Bidirectional model 연산량의 2배


# 3. Compile and Training
model.compile(loss='mse', optimizer='adam')

earlyStopping = EarlyStopping(monitor='loss', mode='min', patience=32, restore_best_weights=True, verbose=1)

model.fit(x_train, y_train, epochs=128, callbacks=[earlyStopping], batch_size=2)


# 4. Evaluation and Prediction
loss = model.evaluate(x_test, y_test)

result = model.predict(x_predict)
print("Predict[100 ... 106]: ", result)



'''
Result(Non-Bi)
[[ 99.99038 ]
 [100.99039 ]
 [101.99034 ]
 [102.99025 ]
 [103.99007 ]
 [104.98979 ]
 [105.989456]]

Result(Bi)
[[ 99.07903 ]
 [ 99.89542 ]
 [100.68855 ]
 [101.46187 ]
 [102.215195]
 [102.94844 ]
 [103.6616  ]]

'''

 

3. Conv1D

: CNN은 Convolution Layer, Pooling Layer, Fully connected Layer로 주로 구성됨

Convolution Layer와 Pooling Layer는 주로 유효 특징 추출을 담당하고, 원본 데이터에서 공간적 정보를 취득할 수 있음

: 1차원 CNN은 이미지 분석이 아닌 시계열이나 텍스트 분석 시 주로 많이 사용됨

-> 1차원: 합성곱을 위한 kernel과 적용하는 데이터의 sequence가 1차원의 모양을 갖는다는 것

# conv1D_california.py

import numpy as np

from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Dense, SimpleRNN, LSTM, Dropout, Conv2D, Flatten, MaxPooling2D, Conv1D
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.datasets import mnist, cifar10, cifar100, fashion_mnist

from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, r2_score


# 1. Data
dataset = fetch_california_housing()

x = dataset.data # for training
y = dataset.target # for predict

x_train, x_test, y_train, y_test = train_test_split(
    x, y,
    train_size=0.7,
    random_state=123
)

scaler = StandardScaler()
scaler.fit(x_train)
x_train = scaler.transform(x_train)
x_test = scaler.transform(x_test)

print(x_train.shape, x_test.shape) # (14447, 8) (6193, 8)

x_train = x_train.reshape(14447, 4, 2)
x_test = x_test.reshape(6193, 4, 2)
# datasets 개수 부분 제외하고 전체 곱만 같으면 문제 X
# (14447,8,1) = (14447,4,2) = (14447,2,4) = (14447,8,1)


# 2. Model Construction
model = Sequential()
model.add(Conv1D(128, 2, padding='same', input_shape=(4,2))) 
model.add(Conv1D(64, 2, padding='same')) 
model.add(Dropout(0.2))
model.add(Conv1D(32, 2, padding='same'))
model.add(Flatten())
model.add(Dense(16, activation='relu'))
model.add(Dense(1))


# 3. Compile and Training
model.compile(loss='mse', optimizer='adam')

earlyStopping = EarlyStopping(monitor='val_loss', mode='min', patience=32, restore_best_weights=True, verbose=1)

model.fit(x_train, y_train,
          validation_split=0.2,
          epochs=128,
          callbacks=[earlyStopping],
          batch_size=32)


# 4. Evaluation and Prediction
result = model.evaluate(x_test, y_test)
print("loss: ", result)

y_predict = model.predict(x_test)

r2 = r2_score(y_test, y_predict)
print("R2: ", r2)



'''
Result
loss:  0.3196415901184082
R2:  0.7582663893325312

'''
# conv1D_fetch_covtype.py

import pandas as pd
import numpy as np

from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Dense, SimpleRNN, LSTM, Dropout, Conv2D, Flatten, MaxPooling2D, Conv1D
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.datasets import mnist, cifar10, cifar100, fashion_mnist

from sklearn.datasets import fetch_covtype
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, r2_score


# 1. Data
dataset = fetch_covtype()

x = dataset.data # for training
y = dataset.target # for predict

y = to_categorical(y)
print(y.shape) # (581012, 8)

y = np.delete(y, 0, axis=1)
print(y.shape)  # (581012, 7)

x_train, x_test, y_train, y_test = train_test_split(
    x, y,
    train_size=0.7,
    random_state=123
)

scaler = StandardScaler()
scaler.fit(x_train)
x_train = scaler.transform(x_train)
x_test = scaler .transform(x_test)

print(x_train.shape, x_test.shape) # (406708, 54) (174304, 54)

x_train = x_train.reshape(406708, 9, 6)
x_test = x_test.reshape(174304, 9, 6)


# 2. Model Construction
model = Sequential()
model.add(Conv1D(128, 2, padding='same', input_shape=(9,6))) 
model.add(Conv1D(64, 2, padding='same')) 
model.add(Dropout(0.2))
model.add(Conv1D(32, 2, padding='same'))
model.add(Flatten())
model.add(Dense(16, activation='relu'))
model.add(Dense(7, activation='softmax'))


# 3. Compile and Training
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

earlyStopping = EarlyStopping(monitor='val_loss', mode='min', patience=32, restore_best_weights=True, verbose=1)

model.fit(x_train, y_train,
          validation_split=0.2,
          epochs=128,
          callbacks=[earlyStopping],
          batch_size=256)


# 4. Evaluation and Prediction
loss, accuracy = model.evaluate(x_test, y_test)
print("loss: ", loss)
print("accuracy: ", accuracy)

y_predict = model.predict(x_test)
y_predict = np.argmax(y_predict, axis=1) # (116203, 7) -> (116203, )
y_test = np.argmax(y_test, axis=1) # (116203, 7) -> (116203,)
# data y를 one hot encoding 해준 상태로 (data_num, class)로 shape이 return 됨

r2 = r2_score(y_test, y_predict)
print("R2: ", r2)


'''
Result
loss:  0.5182842016220093
accuracy:  0.7794886827468872
R2:  0.42494066843707257

'''
# conv1D_fashion.py

import pandas as pd
import numpy as np

from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Dense, SimpleRNN, LSTM, Dropout, Conv2D, Flatten, MaxPooling2D, Conv1D
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.datasets import mnist, cifar10, cifar100, fashion_mnist

from sklearn.datasets import fetch_covtype
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, r2_score


# 1. Data
(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()

print(x_train.shape, x_test.shape) # (60000, 28, 28) (10000, 28, 28)

x_train=x_train/255.
x_test=x_test/255.


# 2. Model Construction
model = Sequential()
model.add(Conv1D(128, 2, padding='same', input_shape=(28,28))) 
model.add(Conv1D(64, 2, padding='same')) 
model.add(Dropout(0.2))
model.add(Conv1D(32, 2, padding='same'))
model.add(Flatten())
model.add(Dense(16, activation='relu'))
model.add(Dense(10, activation='softmax'))


# 3. Compile and Training
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

earlyStopping = EarlyStopping(monitor='val_loss', mode='min', patience=32, restore_best_weights=True, verbose=1)

model.fit(x_train, y_train,
          validation_split=0.2,
          epochs=128,
          callbacks=[earlyStopping],
          batch_size=256)


# 4. Evaluation and Prediction
loss, accuracy = model.evaluate(x_test, y_test)
print("loss: ", loss)
print("accuracy: ", accuracy)

y_predict = model.predict(x_test)
y_predict = np.argmax(y_predict, axis=1) # (10000, 10) -> (10000,)

r2 = r2_score(y_test, y_predict) # y_test = (10000,), y_predict = (10000,)
print("R2: ", r2)



'''
Result
loss:  0.3900017738342285
accuracy:  0.864799976348877
R2:  0.7752727272727273

'''

 

 

 

소스 코드

🔗 HJ0216/TIL

 

참고 자료

📑 [딥러닝][NLP] Bidirectional RNN

📑 [Pytorch] Conv1D + LSTM 모델 Pytorch 구현