새소식

딥러닝/논문 리뷰(GitBook으로 이전)

[Implementation] Sequence to Sequence Learning with Neural Networks

  • -

독일어를 영어로 번역하는 task를 논문의 Sequence to Sequence architecture로 구현하고자 한다.

 

 

1. 영어, 독일어 Tokenizer 정의

import spacy

spacy_en = spacy.load("en_core_web_sm")
spacy_de = spacy.load("nl_core_news_sm")

# Tokenizer 함수 정의

def tokenizer_de(text):
  """
  논문에서 토큰의 순서를 거꾸로 뒤집어서 넣었을 때 성능이 향상됨을 보였으므로 토큰의 순서를 뒤집어서 반환
  """
  return [token.text for token in spacy_de.tokenizer(text)][::-1]

def tokenizer_en(text):
  return [token.text for token in spacy_en.tokenizer(text)]

 

2. torchtext의 Field를 통해 앞으로 어떤 전처리를 할 것인지 정의
문장의 시작은 <SOS>, 문장의 끝은 <EOS> 토큰을 통해 모델에 학습하고자 한다. 

import torchtext
from torchtext.legacy.data import Field

SRC = Field(tokenize=tokenizer_de, init_token="<SOS>", eos_token="<EOS>", lower=True)
TRG = Field(tokenize=tokenizer_en, init_token="<SOS>", eos_token="<EOS>", lower=True)

 

3. Dataset Load

Multi30k의 데이터를 이용하며 Field로 정의한 토큰화를 진행하여 train/valid/test set으로 구성

from torchtext.legacy.datasets import Multi30k

train_dataset, valid_dataset, test_dataset = Multi30k.splits(exts=(".de", ".en"), fields=(SRC, TRG))

 

4. 최소 2번 이상 등장한 단어들을 이용해 독일어/영어 단어 사전 생성

SRC.build_vocab(train_dataset, min_freq=2)
TRG.build_vocab(train_dataset, min_freq=2)

 

5. 학습 속도를 위해 불필요한 padding token의 수를 줄이기 위해 하나의 배치에 포함된 문장들이 가지는 단어의 개수를 유사하게 설정

import torch
from torchtext.legacy.data import BucketIterator

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
BATCH_SIZE = 128

train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_dataset, valid_dataset, test_dataset),
    batch_size=BATCH_SIZE,
    device = device
)

 

6. Encoder 구현

import torch.nn as nn

class Encoder(nn.Module):
    
    def __init__(self, input_dim, embed_dim, hidden_dim, n_layers, dropout_ratio):
        super().__init__()
        
        self.embedding = nn.Embedding(input_dim, embed_dim)
        
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout_ratio)
        
        self.dropout = nn.Dropout(dropout_ratio)
    
    def forward(self, src):
        # src = |src_len, bs|
        embedded = self.dropout(self.embedding(src)) # embedded = |src_len, bs, embed_dim|
        
        outputs, (hidden, cell) = self.rnn(embedded)
        # outputs = |src_len, bs, hidden_dim|
        # hidden = |n_layers, bs, hidden_dim|
        # cell = |n_layers, bs, hidden_dim|
        
        return hidden, cell

 

6. Decoder 구현

class Decoder(nn.Module):
    
    def __init__(self, output_dim, embed_dim, hidden_dim, n_layers, dropout_ratio):
        super().__init__()
        
        self.embedding = nn.Embedding(output_dim, embed_dim)
        
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout_ratio)
        
        # Encoder와의 차이점: FC layer
        self.output_dim = output_dim
        self.fc_out = nn.Linear(hidden_dim, output_dim)

        self.dropout = nn.Dropout(dropout_ratio)
    
    def forward(self, input, hidden, cell):
        # input = |bs|
        ## 한번에 하나의 token을 decoding하기 때문에 len==1
        # hidden = |n_layers, bs, hidden_dim|
        # cell = |n_layers, bs, hidden_dim|
        input = input.unsqueeze(0) # 0차원에 대해 unsqueeze를 통해 |1, bs|로 만듬
        
        embedded = self.dropout(self.embedding(input)) # embedding layer 통과후 dropout
        # embedded = |1, bs, embed_dim|
        
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        # output = |1, bs, hidden_dim|
        # hidden = |n_layers, bs, hidden_dim|
        # cell = |n_layers, bs, hidden_dim|
        
        pred = self.fc_out(output.squueze(0)) # output = |bs, hidden_dim|
        
        return pred, hidden, cell

 

7. Seq2Seq 구현

class Seq2Seq(nn.Module):

  def __init__(self, encoder, decoder, device):
    super().__init__()

    self.encoder = encoder
    self.decoder = decoder
    self.device = device

  def forward(self, src, trg, teacher_forcing_ratio=0.5):
    # src = |src_len, bs|
    # trg = |trg_len, bs|

    hidden, cell = self.encoder(src)

    trg_len = trg.shape[0] # 단어 개수
    batch_size = trg.shape[1] # bs
    trg_vocab_size = self.decoder.output_dim 
    outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)

    input = trg[0, :]

    for t in range(1, trg_len):
      output, hidden, cell = self.decoder(input, hidden, cell)

      outputs[t] = output # FC를 거쳐 나온 현재의 출력 단어 정보
      top1 = output.argmax(1) # 가장 확률이 높은 단어의 idx

      # teacjer forcing 여부
      teacher_force = random.random() < teacher_forcing_ratio
      input = trg[t] if teacher_force else top1 
    
    return outputs

 

8. Training

INPUT_DIM = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)
ENCODER_EMBED_DIM = 256
DEOCDER_EMBED_DIM = 256
HIDDEN_DIM = 512
N_LAYERS = 4
ENC_DROPOUT_RATIO = 0.5
DEC_DROPOUT_RATIO = 0.5

모델 선언

enc = Encoder(INPUT_DIM, ENCODER_EMBED_DIM, HIDDEN_DIM, N_LAYERS, ENC_DROPOUT_RATIO)
dec = Decoder(OUTPUT_DIM, DEOCDER_EMBED_DIM, HIDDEN_DIM, N_LAYERS, DEC_DROPOUT_RATIO)

model = Seq2Seq(enc, dec, device).to(device)

모델 초기 가중치 파라미터 정의

def init_weights(m):
  for name, param in m.named_parameters():
    nn.init.uniform(param.data, -0.08, 0.08)

model.apply(init_weights)
import torch.optim as optim

optimizer = optim.Adam(model.parameters())

# padding 무시
TRG_PAD_IDX = TRG.vocab.stoi[TRG.pad_token]
criterion = nn.CrossEntropyLoss(ignore_index=TRG_PAD_IDX)



def train(model, iterator, optimizer, criterion, clip):
  model.train()
  epoch_loss = 0

  for i, batch in enumerate(iterator):
    src = batch.src
    trg = batch.trg

    optimizer.zero_grad()

    output = model(src, trg) # |출력단어개수, bs, output_dim|
    output_dim = output.shape[-1]

    output = output[1:].view(-1, output_dim) # |(출력 단어의 개수 - 1) * bs, output_dim|
    trg = trg[1:].view(-1) # |(타겟 단어의 개수 -1) * bs|

    loss = criterion(output, trg)
    loss.backward()

    torch.nn.utils.clip_grad_norm_(model.parameters(),clip)

    optimizer.step()

    epoch_loss += loss.item()
  
  return epoch_loss / len(iterator)
  
  
def evaluate(model, iterator, criterion):
  model.eval()
  epoch_loss = 0

  with torch.no_grad():
    for i, batch in enumerate(iterator):
      src = batch.src
      trg = batch.trg

      output = model(src, trg) # no teacher forcing
      output_dim = output.shape[-1]

      output = output[1:].view(-1, output_dim)
      trg = trg[1:].view(-1)

      loss = criterion(output, trg)

      epoch_loss += loss.item()

  return epoch_loss / len(iterator)
def epoch_time(start_time, end_time):
  elapsed_time = end_time - start_time
  elapsed_mins = int(elapsed_time / 60)
  elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
  return elapsed_mins, elapsed_secs



import time
import math

N_EPOCHS = 20
CLIP = 1
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
  start_time = time.time()

  train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
  valid_loss = evaluate(model, valid_iterator, criterion)

  end_time = time.time()
  epoch_mins, epoch_secs = epoch_time(start_time, end_time)

  if valid_loss < best_valid_loss:
    best_valid_loss = valid_loss
    torch.save(model.state_dict(), 'seq2seq_ver1.pt')
  
  print(f"Epoch: {epoch+1} | Time: {epoch_mins}m {epoch_secs}s")
  print(f"Train loss: {train_loss} | Train perplexity: {math.exp(train_loss)}")
  print(f"Valid loss: {valid_loss} | Valid perplexity: {math.exp(valid_loss)}")

test set evaluation

model.load_state_dict(torch.load('/content/seq2seq_ver1.pt'))

test_loss = evaluate(model, test_iterator, criterion)

print(f'Test loss: {test_loss:.3f} | Test Perplexity: {math.exp(test_loss):.3f}')

 

9. Inference

def translate(sent, src_field, trg_field, model, device, max_len=45):
  model.eval()

  if isinstance(sent, str):
    spacy_de = spacy.load("nl_core_news_sm")
    tokens = [token.text.lower() for token in spacy_de(sent)]
  else:
    tokens = [token.lower() for token in sent]
  
  # <SOS> <EOS> token
  tokens = [src_field.init_token] + tokens + [src_field.eos_token]
  print(f"soruce token: {tokens}")

  src_indexes = [src_field.vocab.stoi[token] for token in tokens]
  print(f"source token idx: {src_indexes}")

  src_tensor = torch.LongTensor(src_indexes).unsqueeze(1).to(device)

  with torch.no_grad():
    hidden, cell = model.encoder(src_tensor)
  
  trg_indexes = [trg_field.vocab.stoi[trg_field.init_token]] # 처음에는 <SOS>토큰을 갖고 있음

  for i in range(max_len):
    # 이전 출력 단어가 현재 단어로 입력되도록
    trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device) 

    with torch.no_grad():
      output, hidden, cell = model.decoder(trg_tensor, hidden, cell)

    pred_token = output.argmax(1).item()
    trg_indexes.append(pred_token)

    if pred_token == trg_field.vocab.stoi[trg_field.eos_token]:
      break
    
  trg_tokens = [trg_field.vocab.itos[i] for i in trg_indexes]

  return trg_tokens
model.load_state_dict(torch.load('/content/seq2seq_ver1.pt'))

example_idx = 10

src = vars(test_dataset.examples[example_idx])['src']
trg = vars(test_dataset.examples[example_idx])['trg']

print(f"Source Dutch: {src}")
print(" ".join(translate(src, SRC, TRG, model, device)))
print(trg)
728x90
Contents