2025-10-08 11:39:08 +02:00
import random
from typing import Generator
2025-10-07 15:36:51 +02:00
import pandas as pd
2025-10-08 11:39:08 +02:00
2025-10-07 17:41:53 +02:00
import Project_Model . Libs . BPE as BPE
from Scripts . Libs . CleaningPipeline . special_token import SpecialToken
2025-10-08 00:39:16 +02:00
from Project_Model . Libs . Transformer . Classes . SpannedMasker import SpannedMasker
2025-10-08 11:26:47 +02:00
from TokenCompletation import TokenCompletationTransformer
from Project_Model . Libs . BPE . Enums . SpecialToken import SpecialToken
2025-10-08 11:39:08 +02:00
2025-10-07 15:36:51 +02:00
class Batcher :
2025-10-08 00:39:16 +02:00
def __init__ ( self , dataset_path : str , batch_size : int , tokenizer : BPE . TokeNanoCore , masker : SpannedMasker ) - > None :
2025-10-07 15:36:51 +02:00
# ABSTRACT, TRIPLE
# tasks:
# rdf2text: X: TRIPLE, Y: ABSTRACT
# text2rdf: X: ABSTRACT, X:TRIPLE
# masking ( call masker): X: incomplete_triple Y: complete_triple (as exam)
# completation: X: TRIPLE SUBSET, Y: related TRIPLE SUBSET
self . _dataset_path = dataset_path
self . _batch_size = batch_size
self . _tokenizer = tokenizer
2025-10-08 00:39:16 +02:00
self . _masker = masker
2025-10-07 15:36:51 +02:00
2025-10-08 11:26:47 +02:00
sotl = self . _tokenizer . encode ( SpecialToken . START_TRIPLE_LIST . value )
eos = self . _tokenizer . encode ( SpecialToken . END_OF_SEQUENCE . value )
self . _token_completation = TokenCompletationTransformer ( sotl , eos )
2025-10-08 11:39:08 +02:00
def get_batch ( self ) - > Generator [ pd . DataFrame ] :
2025-10-08 11:26:47 +02:00
for batch in pd . read_csv ( self . _dataset_path , chunksize = int ( self . _batch_size / 4 ) ) : #now we support 3 task
2025-10-08 11:39:08 +02:00
2025-10-07 17:41:53 +02:00
tokenized_batch = pd . DataFrame ( )
2025-10-08 11:39:08 +02:00
tokenized_batch [ [ " Abstract " , " RDFs " ] ] = (
batch [ [ " Abstract " , " RDFs " ] ]
. map ( lambda t : self . _tokenizer . encode ( t ) )
)
2025-10-07 15:36:51 +02:00
2025-10-07 17:41:53 +02:00
rdf2txt_batch = self . __rdf2txt_transformation ( tokenized_batch )
txt2rdf_batch = self . __txt2rdf_transformation ( tokenized_batch )
2025-10-08 00:39:16 +02:00
mask_batch = self . __masking_trasformation ( tokenized_batch )
2025-10-08 11:39:08 +02:00
completation_batch = self . __token_completation_task ( tokenized_batch )
2025-10-07 15:36:51 +02:00
2025-10-08 11:39:08 +02:00
output = pd . concat ( [ rdf2txt_batch , txt2rdf_batch , mask_batch , completation_batch ] , ignore_index = True )
output = output . sample ( frac = 1 ) . reset_index ( drop = True )
2025-10-07 17:41:53 +02:00
yield output
2025-10-07 15:36:51 +02:00
2025-10-08 11:39:08 +02:00
def __random_subset_rdfs ( self , batch : pd . DataFrame , seed = 0 ) :
# WIP
2025-10-07 20:09:51 +02:00
rng = random . Random ( seed )
def to_list ( x ) :
return x . split ( SpecialToken . START_TRIPLE . value ) [ 1 : ]
2025-10-07 17:41:53 +02:00
batch [ " RDFs " ] = batch [ " RDFs " ] . map (
2025-10-07 20:09:51 +02:00
to_list
2025-10-07 17:41:53 +02:00
)
2025-10-07 15:36:51 +02:00
def __rdf2txt_transformation ( self , batch : pd . DataFrame ) :
2025-10-07 17:41:53 +02:00
batch = batch . rename ( columns = { " RDFs " : " X " , " Abstract " : " Y " } )
2025-10-08 11:39:08 +02:00
return batch [ [ " X " , " Y " ] ]
2025-10-07 15:36:51 +02:00
def __txt2rdf_transformation ( self , batch : pd . DataFrame ) :
2025-10-07 17:41:53 +02:00
batch = batch . rename ( columns = { " Abstract " : " X " , " RDFs " : " Y " } )
2025-10-08 11:39:08 +02:00
return batch [ [ " X " , " Y " ] ]
2025-10-07 15:36:51 +02:00
2025-10-08 00:39:16 +02:00
def __masking_trasformation ( self , batch : pd . DataFrame ) :
# mask_sequence: List[int] -> Tuple[List[int], List[int]]
xy_tuples = batch [ " RDFs " ] . apply ( self . _masker . mask_sequence ) # Series of (X, Y)
output = batch . copy ( )
# Expand into two columns preserving the original index
output [ [ " X " , " Y " ] ] = pd . DataFrame ( xy_tuples . tolist ( ) , index = batch . index )
return output [ [ " X " , " Y " ] ]
2025-10-08 11:26:47 +02:00
def __token_completation_task ( self , batch : pd . DataFrame ) :
xy_tuples = batch [ " RDFs " ] . apply ( self . _token_completation . get_completation_tuple )
output = batch . copy ( )
output [ [ " X " , " Y " ] ] = pd . DataFrame ( xy_tuples . tolist ( ) , index = batch . index )
return output [ [ " X " , " Y " ] ]
2025-10-07 17:41:53 +02:00
2025-10-08 11:26:47 +02:00
"""
2025-10-07 17:41:53 +02:00
DATASET_PATH = " Assets/Dataset/Tmp/rdf_text.csv "
VOCABULARY_path = " Assets/Dataset/Tmp/trimmed.json "
2025-10-08 00:39:16 +02:00
2025-10-08 11:39:08 +02:00
from pathlib import Path
2025-10-07 17:41:53 +02:00
VOCABULARY = BPE . load_nanos_vocabulary ( Path ( VOCABULARY_path ) )
2025-10-08 00:39:16 +02:00
SPECIAL_LIST = BPE . default_special_tokens ( )
TOKENANO = BPE . TokeNanoCore ( VOCABULARY , SPECIAL_LIST )
SPECIAL_TOKENS : set [ int ] = set ( TOKENANO . encode ( " " . join ( SPECIAL_LIST ) ) )
MASKER = SpannedMasker ( TOKENANO . vocabulary_size , SPECIAL_TOKENS )
2025-10-07 17:41:53 +02:00
prova = " <ABS>Cactus Flower is a 1969 American screwball comedy film directed by Gene Saks, and starring Walter Matthau, Ingrid Bergman and Goldie Hawn, who won an Academy Award for her performance.The screenplay was adapted by I. A. L. Diamond from the 1965 Broadway play of the same title written by Abe Burrows, which, in turn, is based on the French play Fleur de cactus by Pierre Barillet and Jean-Pierre Gredy. Cactus Flower was the ninth highest-grossing film of 1969. "
print ( TOKENANO . encode ( prova ) )
2025-10-08 11:26:47 +02:00
batcher = Batcher ( DATASET_PATH , 8 , TOKENANO , MASKER )
2025-10-07 17:41:53 +02:00
for batch in batcher . get_batch ( ) :
2025-10-08 11:26:47 +02:00
print ( batch )
"""