[NLP] BERT를 이용한 한국어 금융 뉴스 이진 분류 실습

2023. 11. 7. 17:57ML&DL/NLP

 

허깅페이스의 pre-trained model인 BERT를 이용해 실습해보겠습니다.

 

- 목표 : 한국어 금융 텍스트를 이용하여 뉴스 긍정, 부정, 중립 3가지 레이블 분류

 

https://huggingface.co/

 

Hugging Face – The AI community building the future.

 

huggingface.co

허깅페이스(HugginFace)

- pytorch, tensorflow 모두 가능

- 모델 업로드, 전처리, 학습, 평가 및 테스트 코드 모두 제공

- 다양한 사전 학습 언어 모델을 제공하고 있음. 모델의 Hub 로서의 역할. 

=> ko, kor를 입력하면 한국어 모델 Search 가능 

 

- 토크나이저 : 모델마다 사용해야하는 토크나이저가 존재함.

- pipeline : 모델 학습 후에 테스트 단계에서 사용하는 도구 

=> 모델과 토크나이저. 어떤 태스크를 수행하는지 명시

 

 

💻 코드 

라이브러리 설치 / 불러오기

!pip install transformers
!pip install datasets
import pandas as pd
import numpy as np
import random
import time
import datetime
from tqdm import tqdm

import csv
import os

import tensorflow as tf
import torch

# BERT 사용을 위함
from transformers import BertTokenizer
from transformers import BertForSequenceClassification, AdamW, BertConfig
from transformers import get_linear_schedule_with_warmup
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

# for padding
from tensorflow.keras.preprocessing.sequence import pad_sequences 

# 전처리 및 평가 지표
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score, hamming_loss

 

데이터셋 불러오기

!wget https://raw.githubusercontent.com/ukairia777/finance_sentiment_corpus/main/finance_data.csv
import pandas as pd
df = pd.read_csv('finance_data.csv')
df.head()

 

- 3개의 레이블이 있는 것을 확인할 수 있음 - 중립(neutral), 긍정(positive), 부정(negative) 

 

df['labels'] = df['labels'].replace(['neutral', 'positive', 'negative'],[0, 1, 2])
df.head()

 

중립 - 0 , 긍정 - 1 , 부정 -2 로 변환해줍니다.

 

변환 데이터 저장 / 불러오기

df.to_csv('finance_data.csv', index=False, encoding='utf-8-sig')
from datasets import load_dataset

all_data = load_dataset(
        "csv",
        data_files={
            "train": "finance_data.csv",
        },
    )
all_data

 

train/ test 데이터 8:2 분리 

cs = all_data['train'].train_test_split(0.2)
train_cs = cs["train"]
test_cs = cs["test"]



train/val 데이터 8:2 분리 

# 훈련 데이터를 다시 8:2로 분리 후 훈련 데이터와 검증 데이터로 저장
cs = train_cs.train_test_split(0.2)
train_cs = cs["train"]
valid_cs = cs["test"]

 

샘플 출력해보기

print('두번째 샘플 출력 :', train_cs['kor_sentence'][1])
print('두번째 샘플의 레이블 출력 :', train_cs['labels'][1])
두번째 샘플 출력 : 미드나잇트레이더 전체 시간 연장 거래 분석 및 뉴스 서비스는 COMTEX를 통해 실시간으로 이용할 수 있다.
두번째 샘플의 레이블 출력 : 0

 

데이터 전처리

 

- BERT 구조에 맞춰주기.

* BERT는 입력 시작을 알리는 토큰 [CLS] 과 입력 종료를 알리는 [SEP] 스페셜 토큰을 사용함.

# 훈련 데이터, 검증 데이터, 테스트 데이터에 대해서 `[CLS] 문장 [SEP]` 구조를 만듭니다.

train_sentences = list(map(lambda x: '[CLS] ' + str(x) + ' [SEP]', train_cs['kor_sentence']))
validation_sentences = list(map(lambda x: '[CLS] ' + str(x) + ' [SEP]', valid_cs['kor_sentence']))
test_sentences = list(map(lambda x: '[CLS] ' + str(x) + ' [SEP]', test_cs['kor_sentence']))
train_labels = train_cs['labels']
validation_labels = valid_cs['labels']
test_labels = test_cs['labels']

 

test_sentences[:5]
['[CLS] 자본 대출의 최대 금액은 30만 유로이고 최소 가입금액은 10,000 유로입니다. [SEP]',
 '[CLS] QPR은 2009년에 70개 이상의 국가에 퍼져 있으며 은행, 제조 회사, 서비스 회사 및 정부 기관을 포함한 다양한 조직을 보유하고 있다. [SEP]',
 '[CLS] 우리의 표준화된 서비스는 이텔라뿐만 아니라 고객들 사이에서도 긍정적인 반응을 얻고 있습니다. [SEP]',
 "[CLS]  UPM-Kymmene은 골드만삭스의 실적 부진에서 '인라인'으로 업그레이드했다. [SEP]",
 '[CLS] 주당순이익(EPS)은 1.37유로로 2.30유로에 비해 감소했다. [SEP]']

- BERT 구조에 맞게 문장 시작과 끝에 [CLS], [SEP] 토큰이 들어간 것을 확인 할 수 있다.

 

 

토큰화

* 모델마다 사용해야하는 토크나이저가 있다. 

* 여기서는 한국어 BERT 토크나이저 중 하나인 'klue-bert-base'를 사용.

# 한국어 BERT 중 하나인 'klue/bert-base'를 사용.
tokenizer = BertTokenizer.from_pretrained('klue/bert-base')

* 토큰화 후 train,val,test 데이터를 정수 인코딩한 데이터, 레이블, 어텐션 마스크 저장

MAX_LEN = 128

def data_to_tensor (sentences, labels):
  # 정수 인코딩 과정. 각 텍스트를 토큰화한 후에 Vocabulary에 맵핑되는 정수 시퀀스로 변환한다.
  # ex) ['안녕하세요'] ==> ['안', '녕', '하세요'] ==> [231, 52, 45]
  tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]
  input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]

  # pad_sequences는 패딩을 위한 모듈. 주어진 최대 길이를 위해서 뒤에서 0으로 채워준다.
  # ex) [231, 52, 45] ==> [231, 52, 45, 0, 0, 0]
  input_ids = pad_sequences(input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post") 

  attention_masks = []

  for seq in input_ids:
      seq_mask = [float(i > 0) for i in seq]
      attention_masks.append(seq_mask)

  tensor_inputs = torch.tensor(input_ids)
  tensor_labels = torch.tensor(labels)
  tensor_masks = torch.tensor(attention_masks)

  return tensor_inputs, tensor_labels, tensor_masks
train_inputs, train_labels, train_masks = data_to_tensor(train_sentences, train_labels)
validation_inputs, validation_labels, validation_masks = data_to_tensor(validation_sentences, validation_labels)
test_inputs, test_labels, test_masks = data_to_tensor(test_sentences, test_labels)
print(train_inputs[0])
print(train_masks[0])

 

=>train_inputs :  토큰화된 텍스트의 정수 인코딩 된 값. 처음과 끝의 2,3은 [CLS] [SEP] 스페셜 토큰 값임.

=> train_masks : 텍스트가 있는 부분은 1 , 0은 없는 부분 max_len = 128 로 지정해줬기 때문에 128개의 인코딩 값이 저장됨

 

* 병렬 처리를 위해 데이터로더 지정

* 배치 크기 32 = > 배치 단위로 데이터를 가져올 수 있음

batch_size = 32

train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

validation_data = TensorDataset(validation_inputs, validation_masks, validation_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)

test_data = TensorDataset(test_inputs, test_masks, test_labels)
test_sampler = RandomSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)
print('훈련 데이터의 크기:', len(train_labels))
print('검증 데이터의 크기:', len(validation_labels))
print('테스트 데이터의 크기:', len(test_labels))

 

* 학습을 위해 GPU 셋팅

 if torch.cuda.is_available():    
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print('No GPU available, using the CPU instead.')

 

모델 불러오기

num_labels = 3

model = BertForSequenceClassification.from_pretrained("klue/bert-base", num_labels=num_labels)
model.cuda()

=> 3개의 클래스(중립 : 0 , 긍정 : 1, 부정: 2 )로 분류를 하기위해 num_labels = 3  

 

* 옵티마이저 : 아담, 학습률 지정 

# 옵티마이저 선택
optimizer = AdamW(model.parameters(),
                  lr = 2e-5,
                  eps = 1e-8
                )

 

* 학습 지정

# 몇 번의 에포크(전체 데이터에 대한 학습 횟수)를 할 것인지 선택
epochs = 2
total_steps = len(train_dataloader) * epochs
scheduler = get_linear_schedule_with_warmup(optimizer, 
                                            num_warmup_steps = 0,
                                            num_training_steps = total_steps)

 

def format_time(elapsed):
    elapsed_rounded = int(round((elapsed)))
    return str(datetime.timedelta(seconds=elapsed_rounded))  # hh:mm:ss

 

* 성능 평가 지표 : acuraccy, f1-score

def metrics(predictions, labels):
    y_pred = predictions
    y_true = labels

    # 사용 가능한 메트릭들을 사용한다.
    accuracy = accuracy_score(y_true, y_pred)
    f1_macro_average = f1_score(y_true=y_true, y_pred=y_pred, average='macro', zero_division=0)
    f1_micro_average = f1_score(y_true=y_true, y_pred=y_pred, average='micro', zero_division=0)
    f1_weighted_average = f1_score(y_true=y_true, y_pred=y_pred, average='weighted', zero_division=0)

    # 메트릭 결과에 대해서 리턴
    metrics = {'accuracy': accuracy,
               'f1_macro': f1_macro_average,
               'f1_micro': f1_micro_average,
               'f1_weighted': f1_weighted_average}

    return metrics

* 모델 학습

# 랜덤 시드값.
seed_val = 777
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

model.zero_grad()
for epoch_i in range(0, epochs):
    print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs))
    t0 = time.time()
    total_loss = 0

    model.train()

    for step, batch in tqdm(enumerate(train_dataloader)):
        if step % 500 == 0 and not step == 0:
            elapsed = format_time(time.time() - t0)
            print('  Batch {:>5,}  of  {:>5,}.    Elapsed: {:}.'.format(step, len(train_dataloader), elapsed))

        batch = tuple(t.to(device) for t in batch)
        b_input_ids, b_input_mask, b_labels = batch

        outputs = model(b_input_ids, 
                        token_type_ids=None, 
                        attention_mask=b_input_mask, 
                        labels=b_labels)
        
        loss = outputs[0]
        total_loss += loss.item()
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)  # gradient clipping if it is over a threshold
        optimizer.step()
        scheduler.step()

        model.zero_grad()

    avg_train_loss = total_loss / len(train_dataloader)            

    print("")
    print("  Average training loss: {0:.4f}".format(avg_train_loss))
    print("  Training epcoh took: {:}".format(format_time(time.time() - t0)))

* 모델 평가

t0 = time.time()
model.eval()
accum_logits, accum_label_ids = [], []

for batch in validation_dataloader:
    batch = tuple(t.to(device) for t in batch)
    b_input_ids, b_input_mask, b_labels = batch

    with torch.no_grad():
        outputs = model(b_input_ids, 
                        token_type_ids=None, 
                        attention_mask=b_input_mask)

    logits = outputs[0]
    logits = logits.detach().cpu().numpy()
    label_ids = b_labels.to('cpu').numpy()

    for b in logits:
        # 3개의 값 중 가장 큰 값을 예측한 인덱스로 결정
        # ex) [ 3.5134246  -0.30875662 -2.111316  ] ==> 0
        accum_logits.append(np.argmax(b))

    for b in label_ids:
        accum_label_ids.append(b)

accum_logits = np.array(accum_logits)
accum_label_ids = np.array(accum_label_ids)
results = metrics(accum_logits, accum_label_ids)

print("Accuracy: {0:.4f}".format(results['accuracy']))
print("F1 (Macro) Score: {0:.4f}".format(results['f1_macro']))
print("F1 (Micro) Score: {0:.4f}".format(results['f1_micro']))
print("F1 (Weighted) Score: {0:.4f}".format(results['f1_weighted']))

 

* 테스트 평가

t0 = time.time()
model.eval()
accum_logits, accum_label_ids = [], []

for step, batch in tqdm(enumerate(test_dataloader)):
    if step % 100 == 0 and not step == 0:
        elapsed = format_time(time.time() - t0)
        print('  Batch {:>5,}  of  {:>5,}.    Elapsed: {:}.'.format(step, len(test_dataloader), elapsed))

    batch = tuple(t.to(device) for t in batch)
    b_input_ids, b_input_mask, b_labels = batch

    with torch.no_grad():
        outputs = model(b_input_ids, 
                        token_type_ids=None, 
                        attention_mask=b_input_mask)

    logits = outputs[0]
    logits = logits.detach().cpu().numpy()
    label_ids = b_labels.to('cpu').numpy()
    
    for b in logits:
        # 3개의 값 중 가장 큰 값을 예측한 인덱스로 결정
        # ex) [ 3.5134246  -0.30875662 -2.111316  ] ==> 0
        accum_logits.append(np.argmax(b))

    for b in label_ids:
        accum_label_ids.append(b)

accum_logits = np.array(accum_logits)
accum_label_ids = np.array(accum_label_ids)
results = metrics(accum_logits, accum_label_ids)

print("Accuracy: {0:.4f}".format(results['accuracy']))
print("F1 (Macro) Score: {0:.4f}".format(results['f1_macro']))
print("F1 (Micro) Score: {0:.4f}".format(results['f1_micro']))
print("F1 (Weighted) Score: {0:.4f}".format(results['f1_weighted']))

 

* 예측 : 새로운 데이터를 이용해 결과 출력

- 파이프 라인 구축

from transformers import pipeline

 

* retrun_all_scores= True 로 설정하면 모든 레이블의 확률값을 출력해줌

pipe = pipeline("text-classification", model=model.cuda(), tokenizer=tokenizer, device=0, max_length=512,
                return_all_scores=True, function_to_apply='softmax')
result = pipe('SK하이닉스가 매출이 급성장하였다')
print(result)

[[{'label': 'LABEL_0', 'score': 0.06776248663663864}, {'label': 'LABEL_1', 'score': 0.9169609546661377}, {'label': 'LABEL_2', 'score': 0.015276561491191387}]]

 

* 결과만 출력하려면 이 함수 사용

def prediction(text):
  result = pipe(text)
  
  return [label_dict[result[0]['label']]]

 

 

 

* 결과  

 

=> 긍정/부정/중립 잘 구분하는 것을 확인할 수 있음