Post

9. Seq2Seq 계산기

데이터 소개

  • 직접 랜덤 생성한 Dataset을 사용
    1. train_text
    • 23 + 13
    • 1 - 3
    • 32 + 5
    • 1 + 6
      1. train_answer
    • 36
    • -2
    • 37
    • 63



최종 목표

  • Seq2Seq 기반 모델의 이해
  • word embedding 이해
  • 시계열 데이터 학습 이해



전처리

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
"""
데이터셋 생성
"""

import random 

def make_raw_text(count = 50000):
	text_dataset = []
	answer_dataset = []

	for _ in range(count):
		t = random.randint(0, 3) # 0 ~ 3
		if t == 0:
			a = random.randint(0, 10) # 0 ~ 10
		else:
			a = random.randint(0, 100) # 0 ~ 100

		t = random.randint(0, 3):
		if t == 0:
			b = random.randint(0, 10)
		else:
			b = random.randint(0, 100)

		if random.randint(0, 2) == 0:
			train = f'{a} + {b}'
			answer = f'{a + b}'

		else:
			train = f'{a} - {b}'
			answer = f'{a = b}'
		
		text_dataset.append(train)
		answer_dataset.appen(answer)


	return text_dataset, answer_dataset


text_dataset, answer_dataset = make_raw_text()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
"""
token 생성

text_dataset, answer_dataset에서 나오는 모든 token을 dict<token, id>로 생성
단 + -> 10, - -> 11, PAD -> 12, EOS -> 13
"""

token = {str{i}: i for in range(10)} # dict
token.update({'+':10, '-':11, 'PAD':12, 'EOS':13})

"""
{'0': 0,
 '1': 1,
 '2': 2,
 '3': 3,
 '4': 4,
 '5': 5,
 '6': 6,
 '7': 7,
 '8': 8,
 '9': 9,
 '+': 10,
 '-': 11,
 'PAD': 12,
 'EOS': 13}
"""


inv_token = {value:key for key, value in token.items()}
"""
{0: '0',
 1: '1',
 2: '2',
 3: '3',
 4: '4',
 5: '5',
 6: '6',
 7: '7',
 8: '8',
 9: '9',
 10: '+',
 11: '-',
 12: 'PAD',
 13: 'EOS'}
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
"""
Bag of Words 형태로 변경 
단어들의 순서는 전혀 고려하지 않고, 단어들의 출현 빈도에만 집중하는 텍스트 데이터의 수치화 표현 방법
"""
def plain2bow(text, token):
	return np.array([vocab[char] for word in text.split() for char in word] + [token['EOS']])

train_text = [plain2bow(text, token) for text in text_dataset]
train_answer = [plain2bow(text, token) for text in answer_dataset]

print(text_dataset[0]) # '50 + 9'
print(train_text[0]) # [5 0 10 9 13]

print(answer_dataset[0]) # '59'
print(train_answer[0]) # [5 9 13]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
"""
주어진 시퀀스들의 길이 동일하게 맞추기위해 Padding 추가
"""

# (500000, 8)
train_text = tf.keras.preprocessing.sequence.pad_sequences(train_text, 
												    value = token['PAD'])

# (50000, 5)
train_answer = tf.keras.preprocessing.sequence.pad_sequences(train_answer,
													padding = 'post',
													value = token['PAD'])

"""
tf.keras.preprocessing.sequence.pad_sequences
	- 주어진 시퀀스들의 길이를 동일하게 맞추기 위해 패딩 추가
	- 기본적으로 각 시퀀스의 앞쪽에 패딩이 추가되며 값은 0
	- 반환 값은 np.array

value = vocab['PAD]
	- value 인자를 사용해 패딩 시 사용할 값을 지정할 수 있다.
	  여기서는 vocab 딕셔너리의 'PAD'에 해당하는 값(13)을 패딩값으로 사용

padding = 'post'
	- 패딩을 추가하는 위치를 지정할 수 있다.
	- 기본값은 'pre'로서 시퀀스의 앞쪽에 패딩이 추가된다.
	- 'post'를 지정하면 시퀀스의 뒷쪽에 패딩이 추가된다.
"""

print(train_text[0]) # [12 12 12 5 0 10 9 13]
print(train_answer[0]) # [5 9 13 12 12]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
"""
train 과 test 데이터셋 을 각각 98%, 2%의 개수로 분리
"""

def getSplit(data, rate):
	return int(len(data) * rate)

upper_bound = getSplit(train_text, 0.98)

train_text_data = train_text[:upper_bound]
train_answer_data = train_answer[:upper_bound]

test_text_data = train_text[upper_bound:]
test_answer_data = train_answer[upper_bound:]

# (49000, 8) (49000, 5)
print(train_text_data.shape, train_answer_data.shape) 

# (1000, 8) (1000, 5)
print(test_text_data.shape, test_answer_data.shape)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
"""
train_answer_data, test_answer_data의 각 열마다 PAD를 맨 앞에 추가하고 맨 뒤에 있는 PAD를 하나씩 제거하여 decoder에 주입시킬 데이터 생성
"""

train_shifted_answer_data = np.concatenate([np.ones(shape = 
						(train_answer_data.shape[0], 1)) * vocab['PAD'], 
									train_answer_data[...,: -1]], axis = 1)

test_shifted_answer_data = np.concatenate([np.ones(shape = 
							(test_answer_data.shape[0], 1)) * vocab['PAD'], 
									test_answer_data[...,:-1]], axis = 1)

print(train_shifted_answer_data[0])
# [12. 5. 9. 13. 12.]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
"""
정답으로 처리될 train/test_answer_data 를 onehot인코딩으로 바꾸기
"""

print(train_answer_data[0]) # [5, 9, 13, 12, 12]

train_answer_onehot = tf.keras.utils.to_categorical(train_answer_data, 
																len(token))
test_answer_onehot = tf.keras.utils.to_categorical(test_answer_data,
															   len(token))
print(train_answer_onehot[0])
"""
[[0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]]
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
"""
onehot 인코딩된 데이터를 읽기 쉽게 만드는 함수
"""

def onehot2text(onehot):
	return "".join(inv_token[step.argmax()] for step in onehot).replace(
											'EOS', '.').replace('PAD', '')

print(onehot2text(train_answer_onehot[0]))
"""
[[0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]]

59.
"""
1
2
3
4
5
6
7
8
"""
bow 데이터를 읽기 쉽게 만드는 함수
"""

def bow2text(bow):
	return "".join(inv_token[step] for step in bow).replace(
											'EOS', '.').replace('PAD', '.')
											



모델링

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from keras.layers import Input, Embedding, GRU
from keras.models import Model

def seq2seq():
	# encoder
	encoder_input = Input(shape = (8, ))
	embedding = Embedding(len(token), output_dim = 5) # (14, 5)
	x = embedding(encoder_input) # x = (8, 5)
	context_vector = GRU(units = 16)(x) 
	encoder = Model(encoder_input, context_vector)

	# decoder (PAD) 받는 부분
	decoder_input = Input(shape = (5, ))
	y = embedding(decoder_input)
	gru = GRU(units = 16, return_sequences = True)
	y = gru(y, initial_state = context_vector)

	softmax = Dense(units = len(token), activation = 'softmax')
	y = softmax(y)

	# decoder 자기 자신한테 나온 결과물을 다음 state에 넘겨주는 부분
	next_decoder_input = Input(shape = (1, ))
	next_decoder_embedded = embedding(next_decoder_input)

	decoder_initial_state = Input(shape = (16, )) # context vector
	decoder_gru_output = gru(next_decoder_embedded, 
									 initial_state = decoder_initial_state)
	decoder_softmax_output = softmax(decoder_gru_output)

	decoder = Model([next_decoder_input, decoder_initial_state],
							[decoder_softmax_output, decoder_gru_output])


	# model
	model = Model([encoder_input, decoder_input], y)
	model.compile(loss = 'categorical_crossentropy',
								optimizer = 'adam', metrics = ['accuracy'])


	return model, encoder, decoder
1
2
3
4
5
6
7
8
9
10
11
12
"""
학습
"""

hist = model.fit(
	[train_text_data, train_shifted_answer_data], train_answer_onehot,
	validation_data = (
		[test_text_data, test_shifted_answer_data], test_answer_onehot
	),
	verbose = 1,
	epochs = 60
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
"""
encoder 에서 context vector 받아보기
"""

bow = plain2bow("11 + 11", token)
bow = keras.preprocessing.sequence.pad_sequences(bow[np.newaxis,:], 
										value = token['PAD'], maxlen = 8)
context_vector = encoder(bow)

print(bow.shape, bow) 
# (1, 8) [[12 12 1 1 10 1 1 13]]
print(context_vector)
"""
tf.Tensor(
[[-0.56397927 -0.03953929  0.9995534   0.09812198  0.9999832  -0.31743038
  -0.1387286  -0.6269738  -0.3473265  -0.9988625  -0.99112797 -0.9896977
   0.9605613  -0.9241668   0.84268594  0.99998635]], shape=(1, 16), dtype=float32)
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
"""
context_vector와 PAD를 이용해 첫번째 디코딩 데이터 값 확인
"""

init = np.array([token['PAD']])
print(init) # [12]

first_pred, decoder_first_context_vector = decoder([init, context_vector])
print(first_pred, decoder_fitst_context_vector)
print(first_pred.numpy().argmax())

"""
[12]

first_pred:
tf.Tensor(
[[[2.13360973e-09 1.54821619e-01 8.45018983e-01 1.54824622e-04
   4.96740915e-09 1.01301995e-10 1.97923518e-11 8.15454440e-11
   1.67745231e-08 4.54690917e-06 9.38847168e-15 5.23919574e-18
   1.10197173e-11 9.49328435e-20]]], shape=(1, 1, 14), dtype=float32) 

decoder_fitst_context_vector:
tf.Tensor(
[[[-0.99986714 -0.99964404  0.33810085 -0.7053886  -0.2710431
    0.9948965  -0.9619767   0.7557475  -0.26890483 -0.99958247
   -0.9674766  -0.99980193 -0.99446297 -0.74903244  0.9570804
    1.0000001 ]]], shape=(1, 1, 16), dtype=float32)

2
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
"""
첫번째 context_vector와 first_pred 이용해 두번째 디코딩 데이터 값 확인
"""

second_pred, decoder_second_context_vector = decoder([np.array([first_pred.numpy().argmax()]), 
										decoder_first_context_vector[0]])

print(second_pred, decoder_second_context_vector)
print(second_pred.numpy().argmax())

"""
second_pred:
tf.Tensor(
[[[3.8322559e-01 5.2788037e-01 7.7921629e-02 1.0597830e-02 3.6832041e-04
   6.1465435e-06 1.5246128e-07 2.1902353e-09 5.9369447e-11 5.3625333e-13
   1.3712573e-13 9.3953389e-14 5.5824013e-16 3.7457952e-09]]], shape=(1, 1, 14), dtype=float32) 

decoder_second_context_vector:
tf.Tensor(
[[[-0.99986666 -0.9999177   0.86941653 -0.7601866   0.8241618
   -0.854398    0.87542474 -0.9193011  -0.23460813 -0.999997
    0.9116286  -0.9999644  -0.99999523 -0.8724707   0.88665026
    0.9997462 ]]], shape=(1, 1, 16), dtype=float32)

1
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
"""
text가 주어지면 PAD가 나올 때 까지 문자를 decoding 하는 함수
"""

def decoding(text):
	bow = plain2bow(text, vocab)
	bow = tf.keras.preprocessing.seqeunce.pad_sequence(bow[np.newaxis, :], 
										value = token['PAD'], maxlen = 8)

	context_vector = encoder(bow)
	word_vec = np.array([token['PAD']])

	res = []
	while True:
		word_vec, context_vector = decoder([word_vec, context_vector])
		word = inv_token[word_vec.numpy.argmax()]

		if word in ['EOS', 'PAD'] : break

		res.append(word)

		word_vec = np.array([word_vec.numpy().argmax()])
		context_vector = context_vector[0]
	
	answer = ''.join(res)
	print(f'{text} = {answer}')
	
	return answer


decoding("12 + 12")

"""
12 + 12 = 24

'24'
"""
1
2
3
4
5
6
7
"""
모델 평가
"""

model.evaluate([test_text_data, test_shifted_answer_data],
										   test_answer_onehot, verbose = 1)

This post is licensed under CC BY 4.0 by the author.