Batcher added

This commit is contained in:
GassiGiuseppe 2025-10-10 20:10:08 +02:00
parent bed9718f27
commit 96610612fe
3 changed files with 119 additions and 54 deletions

View File

@ -31,7 +31,7 @@ class TokeNanoCore:
def vocabulary_size(self):
BPE_VOC_SIZE = self.__bpe_encoder.vocabulary_size
SPECIAL_VOC_SIZE = self.__special_encoder.vocabulary_size
return BPE_VOC_SIZE + SPECIAL_VOC_SIZE
return BPE_VOC_SIZE + SPECIAL_VOC_SIZE + 1
def encode(self, corpus: str) -> list[int]:
output: list[int] = []

View File

@ -1,49 +1,68 @@
import random
from typing import Generator
import sys
from typing import Any, Generator
import pandas as pd
from pathlib import Path
from Project_Model.Libs.Batch.Enums.TaskType import TaskType
import Project_Model.Libs.BPE as BPE
from Scripts.Libs.CleaningPipeline.special_token import SpecialToken
from Project_Model.Libs.Transformer.Classes.SpannedMasker import SpannedMasker
# from Scripts.Libs.CleaningPipeline.special_token import SpecialToken
from Project_Model.Libs.Transformer import SpannedMasker, truncate_rdf_list, normalize_sequence
from TokenCompletation import TokenCompletationTransformer
from Project_Model.Libs.BPE.Enums.SpecialToken import SpecialToken
from Project_Model.Libs.BPE import SpecialToken
MAX_LENGHT = 128
class Batcher:
def __init__(self, dataset_path: str, batch_size:int, tokenizer: BPE.TokeNanoCore, masker: SpannedMasker) -> None:
def __init__(self, dataset_path: Path, tokenizer: BPE.TokeNanoCore, masker: SpannedMasker, seed:int = 0) -> None:
# ABSTRACT, TRIPLE
# tasks:
# rdf2text: X: TRIPLE, Y: ABSTRACT
# text2rdf: X: ABSTRACT, X:TRIPLE
# masking ( call masker): X: incomplete_triple Y: complete_triple (as exam)
# completation: X: TRIPLE SUBSET, Y: related TRIPLE SUBSET
# it will truncate
# it will instantiate spanmaskter and truncator
self._dataset_path = dataset_path
self._batch_size = batch_size
self._tokenizer = tokenizer
self._masker = masker
sotl = self._tokenizer.encode(SpecialToken.START_TRIPLE_LIST.value)
eos = self._tokenizer.encode(SpecialToken.END_OF_SEQUENCE.value)
self._token_completation = TokenCompletationTransformer(sotl,eos)
self._seed = seed
# self._token_completation = TokenCompletationTransformer(sotl,eos)
self._completation_task_token_truncator = truncate_rdf_list
def get_batch(self)-> Generator[pd.DataFrame]:
for batch in pd.read_csv(self._dataset_path, chunksize= int(self._batch_size/4)): #now we support 3 task
def batch(self, batch_size)-> Generator[tuple[list[list[int]], list[list[int]], list[list[int]],list[list[int]], TaskType],Any,Any]:
"""
Yields: X,Y,padding_X
"""
RNG = random.Random(self._seed)
self._masker.reseed(self._seed)
for batch in pd.read_csv(self._dataset_path, chunksize= int(batch_size)): #now we support 3 task
tokenized_batch = pd.DataFrame()
# encode
tokenized_batch[["Abstract","RDFs"]] = (
batch[["Abstract","RDFs"]]
.map(lambda t: self._tokenizer.encode(t))
)
rdf2txt_batch = self.__rdf2txt_transformation(tokenized_batch)
txt2rdf_batch = self.__txt2rdf_transformation(tokenized_batch)
mask_batch = self.__masking_trasformation(tokenized_batch)
completation_batch = self.__token_completation_task(tokenized_batch)
X,Y, padding_X, padding_Y = self.__rdf2txt_transformation(tokenized_batch)
yield X,Y, padding_X, padding_Y, TaskType.RDF2TXT
X,Y, padding_X, padding_Y, = self.__txt2rdf_transformation(tokenized_batch)
yield X,Y, padding_X, padding_Y, TaskType.TEXT2RDF
X,Y, padding_X, padding_Y, = self.__masking_trasformation(tokenized_batch)
yield X,Y, padding_X, padding_Y, TaskType.MASKING
X,Y, padding_X, padding_Y, = self.__token_completation_task(tokenized_batch, RNG.randint(0,sys.maxsize))
yield X,Y, padding_X, padding_Y, TaskType.COMPLETATION
output = pd.concat([rdf2txt_batch,txt2rdf_batch,mask_batch,completation_batch],ignore_index=True)
output = output.sample(frac=1).reset_index(drop=True)
yield output
# output = pd.concat([rdf2txt_batch,txt2rdf_batch,completation_batch],ignore_index=True)
# output = output.sample(frac=1).reset_index(drop=True)
# self.decode_debug(output)
# yield output
def __random_subset_rdfs(self, batch: pd.DataFrame, seed = 0):
@ -57,35 +76,77 @@ class Batcher:
to_list
)
def decode_debug(self, batch: pd.DataFrame):
decoded = pd.DataFrame()
decoded[["X","Y"]] = (
batch[["X","Y"]]
.map(lambda t: self._tokenizer.decode(t))
)
print(decoded)
def __normalization(self, X:list[list[int]], Y: list[list[int]])-> tuple[list[list[int]], list[list[int]], list[list[int]], list[list[int]]]:
pad_token = self._tokenizer.encode(SpecialToken.PAD.value)[0]
end_token = self._tokenizer.encode(SpecialToken.END_OF_SEQUENCE.value)[0]
out_X = []
padding_X = []
out_Y = []
padding_Y = []
for x in X:
out_x, padding_x = normalize_sequence(x,MAX_LENGHT,pad_token,end_token,True)
out_X.append(out_x)
padding_X.append(padding_x)
for y in Y:
out_y, padding_y = normalize_sequence(y,MAX_LENGHT,pad_token,end_token,True)
out_Y.append(out_y)
padding_Y.append(padding_y)
return out_X,out_Y,padding_X,padding_Y
def __rdf2txt_transformation(self, batch: pd.DataFrame):
batch = batch.rename(columns={"RDFs": "X", "Abstract": "Y"})
return batch[["X", "Y"]]
task_token = self._tokenizer.encode(SpecialToken.RDF_TO_TEXT.value)
out = batch.rename(columns={"RDFs":"X","Abstract":"Y"})[["X","Y"]]
out["X"] = [task_token + x for x in out["X"]]
return self.__normalization(out["X"].to_list(),out["Y"].to_list())
def __txt2rdf_transformation(self, batch: pd.DataFrame):
batch = batch.rename(columns={ "Abstract": "X","RDFs": "Y"})
return batch[["X", "Y"]]
task_token = self._tokenizer.encode(SpecialToken.TEXT_TO_RDF.value)
out = batch.rename(columns={"Abstract":"X","RDFs":"Y"})[["X","Y"]]
out["X"] = [task_token + x for x in out["X"]]
return self.__normalization(out["X"].to_list(),out["Y"].to_list())
def __masking_trasformation(self, batch: pd.DataFrame):
# mask_sequence: List[int] -> Tuple[List[int], List[int]]
xy_tuples = batch["RDFs"].apply(self._masker.mask_sequence) # Series of (X, Y)
output = batch.copy()
# Expand into two columns preserving the original index
output[["X", "Y"]] = pd.DataFrame(xy_tuples.tolist(), index=batch.index)
return output[["X", "Y"]]
X = []
Y = []
for rdf in batch["RDFs"]:
x,y = self._masker.mask_sequence(rdf)
X.append(x)
Y.append(y)
return self.__normalization(X,Y)
def __token_completation_task(self, batch: pd.DataFrame):
xy_tuples = batch["RDFs"].apply(self._token_completation.get_completation_tuple)
output = batch.copy()
output[["X", "Y"]] = pd.DataFrame(xy_tuples.tolist(), index=batch.index)
return output[["X", "Y"]]
def __token_completation_task(self, batch: pd.DataFrame, minibatch_seed: int):
continue_triple_token = self._tokenizer.encode(SpecialToken.CONTINUE_RDF.value)[0]
eot = self._tokenizer.encode(SpecialToken.END_TRIPLE.value)[0]
X = []
Y = []
for rdf in batch["RDFs"]:
x,y = self._completation_task_token_truncator(rdf, 0.5, continue_triple_token, eot, minibatch_seed)
X.append(x)
Y.append(y)
return self.__normalization(X,Y)
"""
DATASET_PATH = "Assets/Dataset/Tmp/rdf_text.csv"
if __name__ == "__main__":
DATASET_PATH = Path("Assets/Dataset/Tmp/rdf_text.csv")
VOCABULARY_path = "Assets/Dataset/Tmp/trimmed.json"
from pathlib import Path
@ -98,7 +159,6 @@ MASKER = SpannedMasker(TOKENANO.vocabulary_size,SPECIAL_TOKENS)
prova = "<ABS>Cactus Flower is a 1969 American screwball comedy film directed by Gene Saks, and starring Walter Matthau, Ingrid Bergman and Goldie Hawn, who won an Academy Award for her performance.The screenplay was adapted by I. A. L. Diamond from the 1965 Broadway play of the same title written by Abe Burrows, which, in turn, is based on the French play Fleur de cactus by Pierre Barillet and Jean-Pierre Gredy. Cactus Flower was the ninth highest-grossing film of 1969."
print(TOKENANO.encode(prova))
batcher = Batcher(DATASET_PATH,8,TOKENANO,MASKER)
for batch in batcher.get_batch():
batcher = Batcher(DATASET_PATH,TOKENANO,MASKER)
for batch in batcher.batch(8):
print(batch)
"""

View File

@ -25,6 +25,11 @@ class SpannedMasker:
self.__forbidden_tokens = forbidden_tokens
def reseed(self, seed:int):
self.__rng = random.Random(seed)
def mask_sequence(
self,
token_sequence: list[int],