import random import torch from pathlib import Path import Project_Model.Libs.BPE as BPE import Project_Model.Libs.Transformer as Transformer import Project_Model.Libs.TransformerUtils as TUtils import Project_Model.Libs.TorchShims as torch_shims import Project_Model.Libs.Batch as Batch # set a default device DEVICE = torch_shims.get_default_device() torch.set_default_device(DEVICE) # set a fixed seed torch.manual_seed(0) random.seed(0) # Get paths MODEL_DIR = "Assets/Model/curated" # MODEL_DIR= "Assets/Dataset/Tmp" VOCABULARY_PATH = Path("Assets/Model/small/bpe-small-16.json") TRAIN_DATASET_PATH = Path("Assets/Dataset/1-hop/small/holdout/train.csv") VALIDATION_DATASET_PATH = Path("Assets/Dataset/1-hop/small/holdout/evaluation.csv") TEST_DATASET_PATH = Path("Assets/Dataset/1-hop/small/holdout/test.csv") # TEST_DATASET_PATH = Path("Assets/Dataset/1-hop/toy/rdf_text.csv") MODEL_PATH = Path(f"{MODEL_DIR}/NanoSocrates.zip") # BPE Init SPECIAL_VOC = BPE.default_special_tokens() VOCABULARY = BPE.load_nanos_vocabulary(VOCABULARY_PATH) TOKENANO = BPE.TokeNanoCore(VOCABULARY, SPECIAL_VOC) # Constants MASK_EXTRA_SPACE = 100 REAL_TOKEN_SPACE_SIZE = TOKENANO.vocabulary_size TOKEN_SPACE_SIZE = TOKENANO.vocabulary_size + MASK_EXTRA_SPACE EMBEDDED_SIZE = 256 FEED_FORWARD_MULTIPLIER = 4 ATTENTION_HEADS = 4 SENTENCE_LENGTH = 256 NUMBER_OF_BLOCKS = 2 SOS_TOKEN = TOKENANO.encode("")[0] PAD_TOKEN = TOKENANO.encode("")[0] END_TOKEN = TOKENANO.encode("")[0] SUBJ_TOKEN = TOKENANO.encode("")[0] REL_TOKEN = TOKENANO.encode("")[0] OBJ_TOKEN = TOKENANO.encode("")[0] MASK_TOKEN = TOKENANO.encode("")[0] CONTINUTE_TOKEN = TOKENANO.encode("")[0] SPECIAL_TOKENS: set[int] = set(TOKENANO.encode("".join(BPE.default_special_tokens()))) ALLOWED_TOKENS = set([SUBJ_TOKEN, REL_TOKEN, OBJ_TOKEN]) FORBIDDEN_TOKENS = SPECIAL_TOKENS - ALLOWED_TOKENS # Spanned_Masker MASKER = Transformer.SpannedMasker(REAL_TOKEN_SPACE_SIZE, FORBIDDEN_TOKENS, average_span=4) TRAIN_BATCHER = Batch.Batcher(TRAIN_DATASET_PATH, SENTENCE_LENGTH, TOKENANO, MASKER) VALIDATION_BATCHER = Batch.Batcher( VALIDATION_DATASET_PATH, SENTENCE_LENGTH, TOKENANO, MASKER ) TEST_BATCHER = Batch.Batcher(TEST_DATASET_PATH, SENTENCE_LENGTH, TOKENANO, MASKER, debug=True) # Model NANOSOCRATES_TRAIN = Transformer.TrainingModel( TOKEN_SPACE_SIZE, EMBEDDED_SIZE, FEED_FORWARD_MULTIPLIER, ATTENTION_HEADS, NUMBER_OF_BLOCKS, ) NANOSOCRATES = Transformer.NanoSocratesCore( TOKEN_SPACE_SIZE, SENTENCE_LENGTH, SOS_TOKEN, PAD_TOKEN, END_TOKEN, CONTINUTE_TOKEN, EMBEDDED_SIZE, FEED_FORWARD_MULTIPLIER, ATTENTION_HEADS, NUMBER_OF_BLOCKS, ) if MODEL_PATH.is_file(): nanosocrates_dict = torch.load(MODEL_PATH, weights_only=True, map_location=DEVICE) NANOSOCRATES_TRAIN.load_state_dict(nanosocrates_dict) _, ENCODER_ONLY, DECODER_ONLY = TUtils.decompose_nano_socrates( NANOSOCRATES, TOKEN_SPACE_SIZE, EMBEDDED_SIZE ) NANOSOCRATES = TUtils.train2inference( NANOSOCRATES_TRAIN, NANOSOCRATES ) NANOSOCRATES.eval() ENCODER_ONLY.eval() DECODER_ONLY.eval() NANOSOCRATES_TRAIN.eval() task_1_metrics = [] task_2_metrics = [] task_3_metrics = [] task_4_metrics = [] example_num = 0 with torch.no_grad(): for example in TEST_BATCHER.batch(1): print(f"DOING Example: {example_num}") src_x, tgt_y, pad_x, pad_y, tasktype = example enc_x = torch.tensor(src_x) ACTUAL_BATCH_SIZE, _ = enc_x.shape enc_x_pad = torch.tensor(pad_x, dtype=torch.bool) tgt = torch.tensor(tgt_y) tgt_pad = torch.tensor(pad_y, dtype=torch.bool) dec_x = Transformer.get_decoder_input( ACTUAL_BATCH_SIZE, SOS_TOKEN, PAD_TOKEN, SENTENCE_LENGTH ) dec_x[:, 1:] = tgt[:, :-1] dec_x_pad = dec_x.eq(PAD_TOKEN) out: torch.Tensor = NANOSOCRATES.inference((enc_x, enc_x_pad), tasktype) tokens: list[int] = out.tolist()[0] tokens.append(END_TOKEN) tokens = list(map(lambda x: MASK_TOKEN if x > TOKENANO.vocabulary_size else x, tokens)) out_string = TOKENANO.decode(tokens) exp_tokens: list[int] = tgt_y[0] exp_tokens = list(map(lambda x: MASK_TOKEN if x > TOKENANO.vocabulary_size else x, exp_tokens)) exp_string = TOKENANO.decode(exp_tokens) enc_tokens: list[int] = src_x[0] enc_tokens = list(map(lambda x: MASK_TOKEN if x > TOKENANO.vocabulary_size else x, enc_tokens)) enc_string = TOKENANO.decode(enc_tokens) print(f"PROMPT:\n{enc_string}") print(f"EXPECTED:\n{exp_string}") print(f"ACTUAL:\n{out_string}") if tasktype == Batch.TaskType.RDF2TXT: example_num += 1 ref = TUtils.remove_padding(exp_tokens, PAD_TOKEN, END_TOKEN) pred = TUtils.remove_padding(tokens, PAD_TOKEN, END_TOKEN) ref_str = TOKENANO.decode(ref) pred_str = TOKENANO.decode(pred) bleu, rouge, meteor = TUtils.rdf2txt([ref_str], [pred_str]) task_1_metrics.append( [ bleu["bleu"], rouge["rougeL"], meteor["meteor"] # type: ignore ] ) if tasktype == Batch.TaskType.TEXT2RDF: ref = TUtils.remove_padding(exp_tokens, PAD_TOKEN, END_TOKEN) pred = TUtils.remove_padding(tokens[1:], PAD_TOKEN, END_TOKEN) ref, pred = TUtils.balance_paddings(ref, pred, PAD_TOKEN) precision, recall = TUtils.txt2rdf(ref, pred) task_2_metrics.append( [ precision["precision"], recall["recall"] # type: ignore ] ) if tasktype == Batch.TaskType.MASKING: ref = TUtils.remove_padding(exp_tokens, PAD_TOKEN, END_TOKEN) pred = TUtils.remove_padding(tokens, PAD_TOKEN, END_TOKEN) ref, pred = TUtils.balance_paddings(ref, pred, PAD_TOKEN) accuracy = TUtils.accuracy(ref, pred) task_3_metrics.append( accuracy["accuracy"] # type: ignore ) if tasktype == Batch.TaskType.COMPLETATION: ref = TUtils.remove_padding(exp_tokens, PAD_TOKEN, END_TOKEN) pred = TUtils.remove_padding(tokens, PAD_TOKEN, END_TOKEN) ref, pred = TUtils.balance_paddings(ref, pred, PAD_TOKEN) precision, recall = TUtils.txt2rdf(ref, pred) task_4_metrics.append( [ precision["precision"], recall["recall"] # type: ignore ] ) bleus = [row[0] for row in task_1_metrics] rouges = [row[1] for row in task_1_metrics] meteors = [row[2] for row in task_1_metrics] prec_1 = [row[0] for row in task_2_metrics] rec_1 = [row[1] for row in task_2_metrics] acc = task_3_metrics prec_2 = [row[0] for row in task_4_metrics] rec_2 = [row[1] for row in task_4_metrics] BLEU = TUtils.average(bleus) ROUGE = TUtils.average(rouges) METEOR = TUtils.average(meteors) PREC_1 = TUtils.average(prec_1) REC_1 = TUtils.average(rec_1) F1_1 = TUtils.f1(PREC_1, REC_1) ACC = TUtils.average(acc) PREC_2 = TUtils.average(prec_2) REC_2 = TUtils.average(rec_2) F1_2 = TUtils.f1(PREC_2, REC_2) SEPARATOR = "**************************************************************************" OUTPUT = "".join([ f"{SEPARATOR}\n", "*\tRDF2TXT:\n", f"*\t\tBLEU: {BLEU} - ROUGE: {ROUGE} - METEOR: {METEOR}\n" f"{SEPARATOR}\n", "*\tTXT2RDF:\n", f"*\t\tPRECISION: {PREC_1} - RECALL: {REC_1} - F1: {F1_1}\n" f"{SEPARATOR}\n", "*\tRDF Completion 1:\n", f"*\t\tACCURACY: {ACC}\n" f"{SEPARATOR}\n", "*\tRDF Completion 2:\n", f"*\t\tPRECISION: {PREC_2} - RECALL: {REC_2} - F1: {F1_2}\n" f"{SEPARATOR}\n", "" ]) print(OUTPUT) print("\nDEBUG") print(task_1_metrics) print(task_2_metrics) print(task_3_metrics) print(task_4_metrics)