9. Seq2Seq 계산기
데이터 소개
- 직접 랜덤 생성한 Dataset을 사용
- train_text
- 23 + 13
- 1 - 3
- 32 + 5
- 1 + 6
- …
- train_answer
- 36
- -2
- 37
- 63
- …
최종 목표
전처리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
"""
데이터셋 생성
"""
import random
def make_raw_text(count = 50000):
text_dataset = []
answer_dataset = []
for _ in range(count):
t = random.randint(0, 3) # 0 ~ 3
if t == 0:
a = random.randint(0, 10) # 0 ~ 10
else:
a = random.randint(0, 100) # 0 ~ 100
t = random.randint(0, 3):
if t == 0:
b = random.randint(0, 10)
else:
b = random.randint(0, 100)
if random.randint(0, 2) == 0:
train = f'{a} + {b}'
answer = f'{a + b}'
else:
train = f'{a} - {b}'
answer = f'{a = b}'
text_dataset.append(train)
answer_dataset.appen(answer)
return text_dataset, answer_dataset
text_dataset, answer_dataset = make_raw_text()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
"""
token 생성
text_dataset, answer_dataset에서 나오는 모든 token을 dict<token, id>로 생성
단 + -> 10, - -> 11, PAD -> 12, EOS -> 13
"""
token = {str{i}: i for in range(10)} # dict
token.update({'+':10, '-':11, 'PAD':12, 'EOS':13})
"""
{'0': 0,
'1': 1,
'2': 2,
'3': 3,
'4': 4,
'5': 5,
'6': 6,
'7': 7,
'8': 8,
'9': 9,
'+': 10,
'-': 11,
'PAD': 12,
'EOS': 13}
"""
inv_token = {value:key for key, value in token.items()}
"""
{0: '0',
1: '1',
2: '2',
3: '3',
4: '4',
5: '5',
6: '6',
7: '7',
8: '8',
9: '9',
10: '+',
11: '-',
12: 'PAD',
13: 'EOS'}
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
"""
Bag of Words 형태로 변경
단어들의 순서는 전혀 고려하지 않고, 단어들의 출현 빈도에만 집중하는 텍스트 데이터의 수치화 표현 방법
"""
def plain2bow(text, token):
return np.array([vocab[char] for word in text.split() for char in word] + [token['EOS']])
train_text = [plain2bow(text, token) for text in text_dataset]
train_answer = [plain2bow(text, token) for text in answer_dataset]
print(text_dataset[0]) # '50 + 9'
print(train_text[0]) # [5 0 10 9 13]
print(answer_dataset[0]) # '59'
print(train_answer[0]) # [5 9 13]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
"""
주어진 시퀀스들의 길이 동일하게 맞추기위해 Padding 추가
"""
# (500000, 8)
train_text = tf.keras.preprocessing.sequence.pad_sequences(train_text,
value = token['PAD'])
# (50000, 5)
train_answer = tf.keras.preprocessing.sequence.pad_sequences(train_answer,
padding = 'post',
value = token['PAD'])
"""
tf.keras.preprocessing.sequence.pad_sequences
- 주어진 시퀀스들의 길이를 동일하게 맞추기 위해 패딩 추가
- 기본적으로 각 시퀀스의 앞쪽에 패딩이 추가되며 값은 0
- 반환 값은 np.array
value = vocab['PAD]
- value 인자를 사용해 패딩 시 사용할 값을 지정할 수 있다.
여기서는 vocab 딕셔너리의 'PAD'에 해당하는 값(13)을 패딩값으로 사용
padding = 'post'
- 패딩을 추가하는 위치를 지정할 수 있다.
- 기본값은 'pre'로서 시퀀스의 앞쪽에 패딩이 추가된다.
- 'post'를 지정하면 시퀀스의 뒷쪽에 패딩이 추가된다.
"""
print(train_text[0]) # [12 12 12 5 0 10 9 13]
print(train_answer[0]) # [5 9 13 12 12]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
"""
train 과 test 데이터셋 을 각각 98%, 2%의 개수로 분리
"""
def getSplit(data, rate):
return int(len(data) * rate)
upper_bound = getSplit(train_text, 0.98)
train_text_data = train_text[:upper_bound]
train_answer_data = train_answer[:upper_bound]
test_text_data = train_text[upper_bound:]
test_answer_data = train_answer[upper_bound:]
# (49000, 8) (49000, 5)
print(train_text_data.shape, train_answer_data.shape)
# (1000, 8) (1000, 5)
print(test_text_data.shape, test_answer_data.shape)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
"""
train_answer_data, test_answer_data의 각 열마다 PAD를 맨 앞에 추가하고 맨 뒤에 있는 PAD를 하나씩 제거하여 decoder에 주입시킬 데이터 생성
"""
train_shifted_answer_data = np.concatenate([np.ones(shape =
(train_answer_data.shape[0], 1)) * vocab['PAD'],
train_answer_data[...,: -1]], axis = 1)
test_shifted_answer_data = np.concatenate([np.ones(shape =
(test_answer_data.shape[0], 1)) * vocab['PAD'],
test_answer_data[...,:-1]], axis = 1)
print(train_shifted_answer_data[0])
# [12. 5. 9. 13. 12.]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
"""
정답으로 처리될 train/test_answer_data 를 onehot인코딩으로 바꾸기
"""
print(train_answer_data[0]) # [5, 9, 13, 12, 12]
train_answer_onehot = tf.keras.utils.to_categorical(train_answer_data,
len(token))
test_answer_onehot = tf.keras.utils.to_categorical(test_answer_data,
len(token))
print(train_answer_onehot[0])
"""
[[0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]]
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
"""
onehot 인코딩된 데이터를 읽기 쉽게 만드는 함수
"""
def onehot2text(onehot):
return "".join(inv_token[step.argmax()] for step in onehot).replace(
'EOS', '.').replace('PAD', '')
print(onehot2text(train_answer_onehot[0]))
"""
[[0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]]
59.
"""
1
2
3
4
5
6
7
8
"""
bow 데이터를 읽기 쉽게 만드는 함수
"""
def bow2text(bow):
return "".join(inv_token[step] for step in bow).replace(
'EOS', '.').replace('PAD', '.')
모델링
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from keras.layers import Input, Embedding, GRU
from keras.models import Model
def seq2seq():
# encoder
encoder_input = Input(shape = (8, ))
embedding = Embedding(len(token), output_dim = 5) # (14, 5)
x = embedding(encoder_input) # x = (8, 5)
context_vector = GRU(units = 16)(x)
encoder = Model(encoder_input, context_vector)
# decoder (PAD) 받는 부분
decoder_input = Input(shape = (5, ))
y = embedding(decoder_input)
gru = GRU(units = 16, return_sequences = True)
y = gru(y, initial_state = context_vector)
softmax = Dense(units = len(token), activation = 'softmax')
y = softmax(y)
# decoder 자기 자신한테 나온 결과물을 다음 state에 넘겨주는 부분
next_decoder_input = Input(shape = (1, ))
next_decoder_embedded = embedding(next_decoder_input)
decoder_initial_state = Input(shape = (16, )) # context vector
decoder_gru_output = gru(next_decoder_embedded,
initial_state = decoder_initial_state)
decoder_softmax_output = softmax(decoder_gru_output)
decoder = Model([next_decoder_input, decoder_initial_state],
[decoder_softmax_output, decoder_gru_output])
# model
model = Model([encoder_input, decoder_input], y)
model.compile(loss = 'categorical_crossentropy',
optimizer = 'adam', metrics = ['accuracy'])
return model, encoder, decoder
1
2
3
4
5
6
7
8
9
10
11
12
"""
학습
"""
hist = model.fit(
[train_text_data, train_shifted_answer_data], train_answer_onehot,
validation_data = (
[test_text_data, test_shifted_answer_data], test_answer_onehot
),
verbose = 1,
epochs = 60
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
"""
encoder 에서 context vector 받아보기
"""
bow = plain2bow("11 + 11", token)
bow = keras.preprocessing.sequence.pad_sequences(bow[np.newaxis,:],
value = token['PAD'], maxlen = 8)
context_vector = encoder(bow)
print(bow.shape, bow)
# (1, 8) [[12 12 1 1 10 1 1 13]]
print(context_vector)
"""
tf.Tensor(
[[-0.56397927 -0.03953929 0.9995534 0.09812198 0.9999832 -0.31743038
-0.1387286 -0.6269738 -0.3473265 -0.9988625 -0.99112797 -0.9896977
0.9605613 -0.9241668 0.84268594 0.99998635]], shape=(1, 16), dtype=float32)
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
"""
context_vector와 PAD를 이용해 첫번째 디코딩 데이터 값 확인
"""
init = np.array([token['PAD']])
print(init) # [12]
first_pred, decoder_first_context_vector = decoder([init, context_vector])
print(first_pred, decoder_fitst_context_vector)
print(first_pred.numpy().argmax())
"""
[12]
first_pred:
tf.Tensor(
[[[2.13360973e-09 1.54821619e-01 8.45018983e-01 1.54824622e-04
4.96740915e-09 1.01301995e-10 1.97923518e-11 8.15454440e-11
1.67745231e-08 4.54690917e-06 9.38847168e-15 5.23919574e-18
1.10197173e-11 9.49328435e-20]]], shape=(1, 1, 14), dtype=float32)
decoder_fitst_context_vector:
tf.Tensor(
[[[-0.99986714 -0.99964404 0.33810085 -0.7053886 -0.2710431
0.9948965 -0.9619767 0.7557475 -0.26890483 -0.99958247
-0.9674766 -0.99980193 -0.99446297 -0.74903244 0.9570804
1.0000001 ]]], shape=(1, 1, 16), dtype=float32)
2
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
"""
첫번째 context_vector와 first_pred 이용해 두번째 디코딩 데이터 값 확인
"""
second_pred, decoder_second_context_vector = decoder([np.array([first_pred.numpy().argmax()]),
decoder_first_context_vector[0]])
print(second_pred, decoder_second_context_vector)
print(second_pred.numpy().argmax())
"""
second_pred:
tf.Tensor(
[[[3.8322559e-01 5.2788037e-01 7.7921629e-02 1.0597830e-02 3.6832041e-04
6.1465435e-06 1.5246128e-07 2.1902353e-09 5.9369447e-11 5.3625333e-13
1.3712573e-13 9.3953389e-14 5.5824013e-16 3.7457952e-09]]], shape=(1, 1, 14), dtype=float32)
decoder_second_context_vector:
tf.Tensor(
[[[-0.99986666 -0.9999177 0.86941653 -0.7601866 0.8241618
-0.854398 0.87542474 -0.9193011 -0.23460813 -0.999997
0.9116286 -0.9999644 -0.99999523 -0.8724707 0.88665026
0.9997462 ]]], shape=(1, 1, 16), dtype=float32)
1
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
"""
text가 주어지면 PAD가 나올 때 까지 문자를 decoding 하는 함수
"""
def decoding(text):
bow = plain2bow(text, vocab)
bow = tf.keras.preprocessing.seqeunce.pad_sequence(bow[np.newaxis, :],
value = token['PAD'], maxlen = 8)
context_vector = encoder(bow)
word_vec = np.array([token['PAD']])
res = []
while True:
word_vec, context_vector = decoder([word_vec, context_vector])
word = inv_token[word_vec.numpy.argmax()]
if word in ['EOS', 'PAD'] : break
res.append(word)
word_vec = np.array([word_vec.numpy().argmax()])
context_vector = context_vector[0]
answer = ''.join(res)
print(f'{text} = {answer}')
return answer
decoding("12 + 12")
"""
12 + 12 = 24
'24'
"""
1
2
3
4
5
6
7
"""
모델 평가
"""
model.evaluate([test_text_data, test_shifted_answer_data],
test_answer_onehot, verbose = 1)
This post is licensed under CC BY 4.0 by the author.