Kaggle Hate Speech Detection Source Code

Date: 23.07.11 ~ 23.07.16

Writer: 9tailwolf : doryeon514@gm.gist.ac.kr

Library and Setting

import torch
from torch import nn

from transformers import AdamW

import pandas as pd
import numpy as np
import random
import time
import datetime

from transformers import get_linear_schedule_with_warmup
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from keras_preprocessing.sequence import pad_sequences
from transformers import AutoTokenizer, AutoModel,AutoConfig
from sklearn.model_selection import train_test_split
device = 'mps'

Tokenizer Function

def tokenizer(sentence):
    dict_number = {}
    dict_word = {}
    for s in sentence:
        for word in s.split():
            if word not in dict_number.keys():
                dict_number[word] = len(dict_number)
                dict_word[len(dict_number)-1] = word
                
    return dict_number, dict_word

Making Data Function

def make_data(input_ids,attention_masks, labels):
    train_inputs, validation_inputs, train_labels, validation_labels = train_test_split(input_ids, labels, random_state=42, test_size=0.03)

    train_masks, validation_masks, _, _ = train_test_split(attention_masks, 
                                                           input_ids,
                                                           random_state=42, 
                                                           test_size=0.03)

    train_labels = train_labels.tolist()
    validation_labels = validation_labels.tolist()

    train_inputs = torch.LongTensor(train_inputs)
    train_labels = torch.FloatTensor(train_labels)
    train_masks = torch.LongTensor(train_masks)
    validation_inputs = torch.LongTensor(validation_inputs)
    validation_labels = torch.FloatTensor(validation_labels)
    validation_masks = torch.LongTensor(validation_masks)

    BATCH_SIZE = 16

    train_data = TensorDataset(train_inputs, train_masks, train_labels)
    train_sampler = RandomSampler(train_data)
    train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=BATCH_SIZE)

    validation_data = TensorDataset(validation_inputs, validation_masks, validation_labels)
    validation_sampler = SequentialSampler(validation_data)
    validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=BATCH_SIZE)
    
    return train_data, train_sampler, train_dataloader, validation_data, validation_sampler, validation_dataloader

Training Function

def flat_accuracy(preds, labels):
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = np.argmax(labels, axis=1).flatten()
    return np.sum(pred_flat == labels_flat) / len(labels_flat)


def format_time(elapsed):
    elapsed_rounded = int(round((elapsed)))
    return str(datetime.timedelta(seconds=elapsed_rounded))
    
def training(model, datasets):
    train_data, train_sampler, train_dataloader, validation_data, validation_sampler, validation_dataloader = datasets
    optimizer = AdamW(model.parameters(), lr = 4e-5, eps = 1e-8)
    epochs = 5
    total_steps = len(train_dataloader) * epochs
    scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps = 0, num_training_steps = total_steps)
    loss_fn = nn.CrossEntropyLoss()
    
    seed_val = 100
    random.seed(seed_val)
    np.random.seed(seed_val)
    torch.manual_seed(seed_val)
    torch.cuda.manual_seed_all(seed_val)
    
    model.zero_grad()
    
    for epoch_i in range(0, epochs):
        t0 = time.time()
        total_loss = 0
        model.train()
        
        for step, batch in enumerate(train_dataloader):
            if step % 10 == 0 and not step == 0:
                elapsed = format_time(time.time() - t0)
                print('  Batch {:>5,}  of  {:>5,}.    Elapsed: {:}.'.format(step, len(train_dataloader), elapsed))

            batch = tuple(t.to(device) for t in batch)
            b_input_ids, b_input_mask, b_labels = batch  
            outputs = model(b_input_ids, attention_mask=b_input_mask)
            loss = loss_fn(outputs,b_labels)
            total_loss += loss.item()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            scheduler.step()
            model.zero_grad()
            
            
            
        avg_train_loss = total_loss / len(train_dataloader)            

        print("")
        print("  Average training loss: {0:.2f}".format(avg_train_loss))
        print("  Training epcoh took: {:}".format(format_time(time.time() - t0)))
        print("")
        print("Running Validation...")

        t0 = time.time()
        model.eval()

        eval_loss, eval_accuracy = 0, 0
        nb_eval_steps, nb_eval_examples = 0, 0

        for batch in validation_dataloader:
            batch = tuple(t.to(device) for t in batch)
            b_input_ids, b_input_mask, b_labels = batch
            with torch.no_grad():     
                outputs = model(b_input_ids, attention_mask=b_input_mask)
            outputs = outputs.detach().cpu().numpy()
            label_ids = b_labels.to('cpu').numpy()
            tmp_eval_accuracy = flat_accuracy(outputs, label_ids)
            eval_accuracy += tmp_eval_accuracy
            nb_eval_steps += 1

        print("  Accuracy: {0:.2f}".format(eval_accuracy/nb_eval_steps))
        print("  Validation took: {:}".format(format_time(time.time() - t0)))

    print("")
    print("Training complete!")
    
    return model

Main Function

model = KcELECTRA_NN(0.5,3).to(device)
tokenizer = AutoTokenizer.from_pretrained("beomi/KcELECTRA-base")
model = training(model, datasets)

Test

def test_model(fmodel,seq):
    seq = '[cls] '+seq+' [sep]'
    sentence = tokenizer.tokenize(seq)
    test_ids = tokenizer.convert_tokens_to_ids(sentence)
    test_ids = pad_sequences([test_ids], maxlen=150, dtype='long', truncating='post', padding='post')
    test_mask = [float(i>0) for i in test_ids[0]]
    test_ids = torch.tensor(test_ids).to(device)
    test_mask = torch.tensor([test_mask]).to(device)
    with torch.no_grad():     
        outputs = fmodel(test_ids, 
                        token_type_ids=None, 
                        attention_mask=test_mask)
    return np.argmax([outputs[0][0].item(),outputs[0][1].item(),outputs[0][2].item()])