241 lines
8.8 KiB
Python
241 lines
8.8 KiB
Python
import random
|
|
import sys
|
|
from typing import Any, Generator
|
|
import pandas as pd
|
|
from pathlib import Path
|
|
from ..Enums import TaskType
|
|
import Project_Model.Libs.BPE as BPE
|
|
|
|
# from Scripts.Libs.CleaningPipeline.special_token import SpecialToken
|
|
from Project_Model.Libs.Transformer import (
|
|
SpannedMasker,
|
|
truncate_rdf_list,
|
|
normalize_sequence,
|
|
)
|
|
|
|
from Project_Model.Libs.BPE import SpecialToken
|
|
|
|
|
|
|
|
|
|
|
|
class Batcher:
|
|
|
|
def __init__(
|
|
self,
|
|
dataset_path: Path,
|
|
max_length: int,
|
|
tokenizer: BPE.TokeNanoCore,
|
|
masker: SpannedMasker,
|
|
seed: int = 0,
|
|
) -> None:
|
|
# ABSTRACT, TRIPLE
|
|
# tasks:
|
|
# rdf2text: X: TRIPLE, Y: ABSTRACT
|
|
# text2rdf: X: ABSTRACT, X:TRIPLE
|
|
# masking ( call masker): X: incomplete_triple Y: complete_triple (as exam)
|
|
# completation: X: TRIPLE SUBSET, Y: related TRIPLE SUBSET
|
|
# it will truncate
|
|
# it will instantiate spanmaskter and truncator
|
|
self._dataset_path = dataset_path
|
|
self._tokenizer = tokenizer
|
|
self._masker = masker
|
|
self.__max_length = max_length
|
|
self._seed = seed
|
|
# self._token_completation = TokenCompletationTransformer(sotl,eos)
|
|
self._completation_task_token_truncator = truncate_rdf_list
|
|
|
|
def batch(self, batch_size) -> Generator[
|
|
tuple[
|
|
list[list[int]],
|
|
list[list[int]],
|
|
list[list[int]],
|
|
list[list[int]],
|
|
TaskType
|
|
],
|
|
Any,
|
|
Any,
|
|
]:
|
|
"""
|
|
Yields: X,Y,padding_X
|
|
"""
|
|
RNG = random.Random(self._seed)
|
|
self._masker.reseed(self._seed)
|
|
|
|
for batch in pd.read_csv(self._dataset_path, chunksize=batch_size):
|
|
|
|
tokenized_batch = pd.DataFrame()
|
|
# encode
|
|
tokenized_batch[["Abstract", "RDFs"]] = batch[["Abstract", "RDFs"]].map(
|
|
lambda t: self._tokenizer.encode(t)
|
|
)
|
|
|
|
X, Y, padding_X, padding_Y = self.__rdf2txt_transformation(tokenized_batch)
|
|
yield X, Y, padding_X, padding_Y, TaskType.RDF2TXT
|
|
(
|
|
X,
|
|
Y,
|
|
padding_X,
|
|
padding_Y,
|
|
) = self.__txt2rdf_transformation(tokenized_batch)
|
|
yield X, Y, padding_X, padding_Y, TaskType.TEXT2RDF
|
|
(
|
|
X,
|
|
Y,
|
|
padding_X,
|
|
padding_Y,
|
|
) = self.__masking_trasformation(tokenized_batch)
|
|
yield X, Y, padding_X, padding_Y, TaskType.MASKING
|
|
(
|
|
X,
|
|
Y,
|
|
padding_X,
|
|
padding_Y,
|
|
) = self.__token_completation_task(
|
|
tokenized_batch, RNG.randint(0, sys.maxsize)
|
|
)
|
|
yield X, Y, padding_X, padding_Y, TaskType.COMPLETATION
|
|
|
|
# output = pd.concat([rdf2txt_batch,txt2rdf_batch,completation_batch],ignore_index=True)
|
|
# output = output.sample(frac=1).reset_index(drop=True)
|
|
# self.decode_debug(output)
|
|
# yield output
|
|
|
|
def __random_subset_rdfs(self, batch: pd.DataFrame, seed=0):
|
|
# WIP
|
|
rng = random.Random(seed)
|
|
|
|
def to_list(x):
|
|
return x.split(SpecialToken.START_TRIPLE.value)[1:]
|
|
|
|
batch["RDFs"] = batch["RDFs"].map(to_list)
|
|
|
|
def decode_debug(self, batch: pd.DataFrame):
|
|
decoded = pd.DataFrame()
|
|
decoded[["X", "Y"]] = batch[["X", "Y"]].map(lambda t: self._tokenizer.decode(t))
|
|
print(decoded)
|
|
|
|
def __normalization(
|
|
self, X: list[list[int]], Y: list[list[int]]
|
|
) -> tuple[list[list[int]], list[list[int]], list[list[int]], list[list[int]]]:
|
|
pad_token = self._tokenizer.encode(SpecialToken.PAD.value)[0]
|
|
end_token = self._tokenizer.encode(SpecialToken.END_OF_SEQUENCE.value)[0]
|
|
out_X = []
|
|
padding_X = []
|
|
out_Y = []
|
|
padding_Y = []
|
|
|
|
for x in X:
|
|
out_x, padding_x = normalize_sequence(
|
|
x, self.__max_length, pad_token, end_token, True
|
|
)
|
|
out_X.append(out_x)
|
|
padding_X.append(padding_x)
|
|
|
|
for y in Y:
|
|
out_y, padding_y = normalize_sequence(
|
|
y, self.__max_length, pad_token, end_token, True
|
|
)
|
|
out_Y.append(out_y)
|
|
padding_Y.append(padding_y)
|
|
|
|
return out_X, out_Y, padding_X, padding_Y
|
|
|
|
def __rdf2txt_transformation(self, batch: pd.DataFrame):
|
|
task_token = self._tokenizer.encode(SpecialToken.RDF_TO_TEXT.value)
|
|
out = batch.rename(columns={"RDFs": "X", "Abstract": "Y"})[["X", "Y"]]
|
|
out["X"] = [task_token + x for x in out["X"]]
|
|
return self.__normalization(out["X"].to_list(), out["Y"].to_list())
|
|
|
|
def __txt2rdf_transformation(self, batch: pd.DataFrame):
|
|
task_token = self._tokenizer.encode(SpecialToken.TEXT_TO_RDF.value)
|
|
out = batch.rename(columns={"Abstract": "X", "RDFs": "Y"})[["X", "Y"]]
|
|
out["X"] = [task_token + x for x in out["X"]]
|
|
return self.__normalization(out["X"].to_list(), out["Y"].to_list())
|
|
|
|
def __masking_trasformation(self, batch: pd.DataFrame):
|
|
X = []
|
|
Y = []
|
|
for rdf in batch["RDFs"]:
|
|
x, y = self._masker.mask_sequence(rdf)
|
|
X.append(x)
|
|
Y.append(y)
|
|
return self.__normalization(X, Y)
|
|
|
|
def __token_completation_task(self, batch: pd.DataFrame, minibatch_seed: int):
|
|
continue_triple_token = self._tokenizer.encode(SpecialToken.CONTINUE_RDF.value)[
|
|
0
|
|
]
|
|
eot = self._tokenizer.encode(SpecialToken.END_TRIPLE.value)[0]
|
|
X = []
|
|
Y = []
|
|
for rdf in batch["RDFs"]:
|
|
# here first truncate to max_lenght
|
|
rdf = rdf[: self.__max_length] # truncator that uses "eot" so no problem
|
|
x, y = self._completation_task_token_truncator(
|
|
rdf, 0.5, continue_triple_token, eot, minibatch_seed
|
|
)
|
|
X.append(x)
|
|
Y.append(y)
|
|
return self.__token_cmpletation_task_special_normalization(X, Y)
|
|
|
|
def __token_cmpletation_task_special_normalization(self, X: list[list[int]], Y: list[list[int]]
|
|
) -> tuple[list[list[int]], list[list[int]], list[list[int]], list[list[int]]]:
|
|
|
|
def continue_rdf_padding(sequence: list[int], pad_token: int):
|
|
for i, x in enumerate(sequence):
|
|
if x == pad_token:
|
|
i = i+1 # continueRDF will be excluded by the mask
|
|
# fill the tail with True and stop
|
|
return [False] * i + [True] * (len(sequence) - i)
|
|
return [False] * len(sequence) # no pad token found
|
|
|
|
pad_token = self._tokenizer.encode(SpecialToken.PAD.value)[0]
|
|
end_token = self._tokenizer.encode(SpecialToken.END_OF_SEQUENCE.value)[0]
|
|
continue_rdf = self._tokenizer.encode(SpecialToken.CONTINUE_RDF.value)[0]
|
|
out_X = []
|
|
padding_X = []
|
|
out_Y = []
|
|
padding_Y = []
|
|
|
|
for x in X:
|
|
out_x, _ = normalize_sequence(
|
|
x, self.__max_length, pad_token, end_token, True
|
|
)
|
|
out_X.append(out_x)
|
|
# padding_X.append(padding_x)
|
|
special_padding = continue_rdf_padding(out_x,continue_rdf)
|
|
padding_X.append(special_padding)
|
|
|
|
for y in Y:
|
|
out_y, padding_y = normalize_sequence(
|
|
y, self.__max_length, pad_token, end_token, True
|
|
)
|
|
out_Y.append(out_y)
|
|
# special padding
|
|
# special_padding = continue_rdf_padding(out_y,continue_rdf)
|
|
# padding_Y.append(special_padding)
|
|
padding_Y.append(padding_y)
|
|
|
|
return out_X, out_Y, padding_X, padding_Y
|
|
|
|
if __name__ == "__main__":
|
|
|
|
DATASET_PATH = Path("Assets/Dataset/Tmp/rdf_text.csv")
|
|
VOCABULARY_path = "Assets/Dataset/Tmp/trimmed.json"
|
|
|
|
from pathlib import Path
|
|
|
|
VOCABULARY = BPE.load_nanos_vocabulary(Path(VOCABULARY_path))
|
|
SPECIAL_LIST = BPE.default_special_tokens()
|
|
TOKENANO = BPE.TokeNanoCore(VOCABULARY, SPECIAL_LIST)
|
|
SPECIAL_TOKENS: set[int] = set(TOKENANO.encode("".join(SPECIAL_LIST)))
|
|
|
|
MASKER = SpannedMasker(TOKENANO.vocabulary_size, SPECIAL_TOKENS)
|
|
|
|
prova = "<ABS>Cactus Flower is a 1969 American screwball comedy film directed by Gene Saks, and starring Walter Matthau, Ingrid Bergman and Goldie Hawn, who won an Academy Award for her performance.The screenplay was adapted by I. A. L. Diamond from the 1965 Broadway play of the same title written by Abe Burrows, which, in turn, is based on the French play Fleur de cactus by Pierre Barillet and Jean-Pierre Gredy. Cactus Flower was the ninth highest-grossing film of 1969."
|
|
print(TOKENANO.encode(prova))
|
|
batcher = Batcher(DATASET_PATH,256, TOKENANO, MASKER)
|
|
for batch in batcher.batch(8):
|
|
print(batch)
|