NanoSocrates/Playgrounds/evaluation.py
2025-10-16 20:05:35 +02:00

257 lines
7.6 KiB
Python

import torch
from pathlib import Path
import Project_Model.Libs.BPE as BPE
import Project_Model.Libs.Transformer as Transformer
import Project_Model.Libs.TransformerUtils as TUtils
import Project_Model.Libs.TorchShims as torch_shims
import Project_Model.Libs.Batch as Batch
# set a default device
DEVICE = torch_shims.get_default_device()
torch.set_default_device(DEVICE)
# Get paths
# MODEL_DIR = "Assets/Model/curated"
MODEL_DIR= "Assets/Dataset/Tmp"
VOCABULARY_PATH = Path("Assets/Model/small/bpe-small-16.json")
TRAIN_DATASET_PATH = Path("Assets/Dataset/1-hop/small/holdout/train.csv")
VALIDATION_DATASET_PATH = Path("Assets/Dataset/1-hop/small/holdout/evaluation.csv")
TEST_DATASET_PATH = Path("Assets/Dataset/1-hop/small/holdout/test.csv")
TEST_DATASET_PATH = Path("Assets/Dataset/1-hop/toy/rdf_text.csv")
MODEL_PATH = Path(f"{MODEL_DIR}/NanoSocrates.zip")
# BPE Init
SPECIAL_VOC = BPE.default_special_tokens()
VOCABULARY = BPE.load_nanos_vocabulary(VOCABULARY_PATH)
TOKENANO = BPE.TokeNanoCore(VOCABULARY, SPECIAL_VOC)
# Constants
MASK_EXTRA_SPACE = 100
REAL_TOKEN_SPACE_SIZE = TOKENANO.vocabulary_size
TOKEN_SPACE_SIZE = TOKENANO.vocabulary_size + MASK_EXTRA_SPACE
EMBEDDED_SIZE = 256
FEED_FORWARD_MULTIPLIER = 4
ATTENTION_HEADS = 4
SENTENCE_LENGTH = 256
NUMBER_OF_BLOCKS = 2
SOS_TOKEN = TOKENANO.encode("<SOS>")[0]
PAD_TOKEN = TOKENANO.encode("<PAD>")[0]
END_TOKEN = TOKENANO.encode("<EOS>")[0]
SUBJ_TOKEN = TOKENANO.encode("<SUBJ>")[0]
REL_TOKEN = TOKENANO.encode("<PRED>")[0]
OBJ_TOKEN = TOKENANO.encode("<OBJ>")[0]
MASK_TOKEN = TOKENANO.encode("<MASK>")[0]
SPECIAL_TOKENS: set[int] = set(TOKENANO.encode("".join(BPE.default_special_tokens())))
ALLOWED_TOKENS = set([SUBJ_TOKEN, REL_TOKEN, OBJ_TOKEN])
FORBIDDEN_TOKENS = SPECIAL_TOKENS - ALLOWED_TOKENS
# Spanned_Masker
MASKER = Transformer.SpannedMasker(REAL_TOKEN_SPACE_SIZE, FORBIDDEN_TOKENS)
TRAIN_BATCHER = Batch.Batcher(TRAIN_DATASET_PATH, SENTENCE_LENGTH, TOKENANO, MASKER)
VALIDATION_BATCHER = Batch.Batcher(
VALIDATION_DATASET_PATH, SENTENCE_LENGTH, TOKENANO, MASKER
)
TEST_BATCHER = Batch.Batcher(TEST_DATASET_PATH, SENTENCE_LENGTH, TOKENANO, MASKER, debug=True)
# Model
NANOSOCRATES_TRAIN = Transformer.TrainingModel(
TOKEN_SPACE_SIZE,
EMBEDDED_SIZE,
FEED_FORWARD_MULTIPLIER,
ATTENTION_HEADS,
NUMBER_OF_BLOCKS,
)
NANOSOCRATES = Transformer.NanoSocratesCore(
TOKEN_SPACE_SIZE,
SENTENCE_LENGTH,
SOS_TOKEN,
PAD_TOKEN,
END_TOKEN,
EMBEDDED_SIZE,
FEED_FORWARD_MULTIPLIER,
ATTENTION_HEADS,
NUMBER_OF_BLOCKS,
)
if MODEL_PATH.is_file():
nanosocrates_dict = torch.load(MODEL_PATH, weights_only=True, map_location=DEVICE)
NANOSOCRATES_TRAIN.load_state_dict(nanosocrates_dict)
_, ENCODER_ONLY, DECODER_ONLY = TUtils.decompose_nano_socrates(
NANOSOCRATES, TOKEN_SPACE_SIZE, EMBEDDED_SIZE
)
NANOSOCRATES = TUtils.train2inference(
NANOSOCRATES_TRAIN,
NANOSOCRATES
)
NANOSOCRATES.eval()
ENCODER_ONLY.eval()
DECODER_ONLY.eval()
NANOSOCRATES_TRAIN.eval()
task_1_metrics = []
task_2_metrics = []
task_3_metrics = []
task_4_metrics = []
example_num = 0
with torch.no_grad():
for example in TEST_BATCHER.batch(1):
print(f"DOING Example: {example_num}")
src_x, tgt_y, pad_x, pad_y, tasktype = example
enc_x = torch.tensor(src_x)
ACTUAL_BATCH_SIZE, _ = enc_x.shape
enc_x_pad = torch.tensor(pad_x, dtype=torch.bool)
tgt = torch.tensor(tgt_y)
tgt_pad = torch.tensor(pad_y, dtype=torch.bool)
dec_x = Transformer.get_decoder_input(
ACTUAL_BATCH_SIZE, SOS_TOKEN, PAD_TOKEN, SENTENCE_LENGTH
)
dec_x[:, 1:] = tgt[:, :-1]
dec_x_pad = dec_x.eq(PAD_TOKEN)
out: torch.Tensor = NANOSOCRATES.inference((enc_x, enc_x_pad), tasktype)
tokens: list[int] = out.tolist()[0]
tokens.append(END_TOKEN)
tokens = list(map(lambda x: MASK_TOKEN if x > TOKENANO.vocabulary_size else x, tokens))
out_string = TOKENANO.decode(tokens)
exp_tokens: list[int] = tgt_y[0]
exp_tokens = list(map(lambda x: MASK_TOKEN if x > TOKENANO.vocabulary_size else x, exp_tokens))
exp_string = TOKENANO.decode(exp_tokens)
enc_tokens: list[int] = src_x[0]
enc_tokens = list(map(lambda x: MASK_TOKEN if x > TOKENANO.vocabulary_size else x, enc_tokens))
enc_string = TOKENANO.decode(enc_tokens)
print(f"PROMPT:\n{enc_string}")
print(f"EXPECTED:\n{exp_string}")
print(f"ACTUAL:\n{out_string}")
if tasktype == Batch.TaskType.RDF2TXT:
example_num += 1
ref = TUtils.remove_padding(exp_tokens, PAD_TOKEN, END_TOKEN)
pred = TUtils.remove_padding(tokens, PAD_TOKEN, END_TOKEN)
ref_str = TOKENANO.decode(ref)
pred_str = TOKENANO.decode(pred)
bleu, rouge, meteor = TUtils.rdf2txt([ref_str], [pred_str])
task_1_metrics.append(
[
bleu["bleu"], rouge["rougeL"], meteor["meteor"] # type: ignore
]
)
if tasktype == Batch.TaskType.TEXT2RDF:
ref = TUtils.remove_padding(exp_tokens, PAD_TOKEN, END_TOKEN)
pred = TUtils.remove_padding(tokens[1:], PAD_TOKEN, END_TOKEN)
ref, pred = TUtils.balance_paddings(ref, pred, PAD_TOKEN)
precision, recall = TUtils.txt2rdf(ref, pred)
task_2_metrics.append(
[
precision["precision"], recall["recall"] # type: ignore
]
)
if tasktype == Batch.TaskType.MASKING:
ref = TUtils.remove_padding(exp_tokens, PAD_TOKEN, END_TOKEN)
pred = TUtils.remove_padding(tokens, PAD_TOKEN, END_TOKEN)
ref, pred = TUtils.balance_paddings(ref, pred, PAD_TOKEN)
accuracy = TUtils.accuracy(ref, pred)
task_3_metrics.append(
accuracy["accuracy"] # type: ignore
)
if tasktype == Batch.TaskType.COMPLETATION:
ref = TUtils.remove_padding(exp_tokens, PAD_TOKEN, END_TOKEN)
pred = TUtils.remove_padding(tokens, PAD_TOKEN, END_TOKEN)
ref, pred = TUtils.balance_paddings(ref, pred, PAD_TOKEN)
precision, recall = TUtils.txt2rdf(ref, pred)
task_4_metrics.append(
[
precision["precision"], recall["recall"] # type: ignore
]
)
bleus = [row[0] for row in task_1_metrics]
rouges = [row[1] for row in task_1_metrics]
meteors = [row[2] for row in task_1_metrics]
prec_1 = [row[0] for row in task_2_metrics]
rec_1 = [row[1] for row in task_2_metrics]
acc = task_3_metrics
prec_2 = [row[0] for row in task_4_metrics]
rec_2 = [row[1] for row in task_4_metrics]
BLEU = TUtils.average(bleus)
ROUGE = TUtils.average(rouges)
METEOR = TUtils.average(meteors)
PREC_1 = TUtils.average(prec_1)
REC_1 = TUtils.average(rec_1)
F1_1 = TUtils.f1(PREC_1, REC_1)
ACC = TUtils.average(acc)
PREC_2 = TUtils.average(prec_2)
REC_2 = TUtils.average(rec_2)
F1_2 = TUtils.f1(PREC_2, REC_2)
SEPARATOR = "**************************************************************************"
OUTPUT = "".join([
f"{SEPARATOR}\n",
"*\tRDF2TXT:\n",
f"*\t\tBLEU: {BLEU} - ROUGE: {ROUGE} - METEOR: {METEOR}\n"
f"{SEPARATOR}\n",
"*\tTXT2RDF:\n",
f"*\t\tPRECISION: {PREC_1} - RECALL: {REC_1} - F1: {F1_1}\n"
f"{SEPARATOR}\n",
"*\tRDF Completion 1:\n",
f"*\t\tACCURACY: {ACC}\n"
f"{SEPARATOR}\n",
"*\tRDF Completion 2:\n",
f"*\t\tPRECISION: {PREC_2} - RECALL: {REC_2} - F1: {F1_2}\n"
f"{SEPARATOR}\n",
""
])
print(OUTPUT)
print("\nDEBUG")
print(task_1_metrics)
print(task_2_metrics)
print(task_3_metrics)
print(task_4_metrics)