Merge branch 'dev.etl' into dev

This commit is contained in:
Christian Risi 2025-10-05 11:16:54 +02:00
commit ba3a718480
3 changed files with 41 additions and 12 deletions

View File

@ -0,0 +1,21 @@
import pandas as pd
class Debug_csv():
def __init__(self, output_path:str):
self.output = open(output_path, "w")
# then the first row as header
header = ["MovieURI","SubjectURI","RelationshipURI","ObjectURI","Abstract"]
self.output.write(",".join(header) + "\n")
def close(self):
self.output.close()
def write(self, RDF: pd.DataFrame):
"""
Args:
RDF (pd.DataFrame): ["MovieURI","SubjectURI","RelationshipURI","ObjectURI","Abstract"]
"""
RDF.to_csv(self.output, index=False, header=False)

View File

@ -186,3 +186,9 @@ class PipelineApplier():
# as input two dataframe, one with 2 column # as input two dataframe, one with 2 column
return None return None
def regex_on_objects(self, RDF: pd.DataFrame) -> pd.DataFrame:
RDF["ObjectURI"] = (RDF["ObjectURI"].astype("string")
.str.replace(r"\r?\n+", ", ", regex=True) # newlines -> ", "
.str.replace(r"\*", "", regex=True)) # delete all asterisks
return RDF

View File

@ -6,17 +6,12 @@ from Scripts.DataCleaning.data_output_models.rdf_mask_task import RDF_mask_task_
from Scripts.DataCleaning.data_output_models.bpe_corpus import BPE_corpus from Scripts.DataCleaning.data_output_models.bpe_corpus import BPE_corpus
from Scripts.DataCleaning.data_output_models.rdf_text_tasks import RDF_text_task_dataset from Scripts.DataCleaning.data_output_models.rdf_text_tasks import RDF_text_task_dataset
from Scripts.DataCleaning.data_output_models.rdf_completation_task import RDF_completation_task_dataset from Scripts.DataCleaning.data_output_models.rdf_completation_task import RDF_completation_task_dataset
from Scripts.DataCleaning.data_output_models.debug_csv import Debug_csv
import pandas as pd import pandas as pd
class Pipeline(): class Pipeline():
def __init__(self, def __init__(self):
mask_task_dataset_path:str = "./Assets/Dataset/Tmp/rdf_mask.csv",
bpe_corpus_path:str = "./Assets/Dataset/Tmp/corpus.txt",
text_to_rdf_task_dataset_path:str = "./Assets/Dataset/Tmp/rdf_text.csv",
completation_rdf_task_dataset_path:str = "./Assets/Dataset/Tmp/rdf_completation.csv",
):
self.sql_endpoint = SqlEndpoint() self.sql_endpoint = SqlEndpoint()
# classes to manage taskes' datasets # classes to manage taskes' datasets
self.task_rdf_mask = RDF_mask_task_dataset(mask_task_dataset_path) self.task_rdf_mask = RDF_mask_task_dataset(mask_task_dataset_path)
@ -98,6 +93,8 @@ class Pipeline():
# other filter # other filter
# #
RDF = self.filter_applier.delete_relationship_by_list_filter(RDF) RDF = self.filter_applier.delete_relationship_by_list_filter(RDF)
# regex on ObjectURI
RDF = self.filter_applier.regex_on_objects(RDF)
if RDF.empty: if RDF.empty:
continue continue
RDF = self.filter_applier.rdf_add_special_token(RDF) # WARNING, THIS MUST BE DONE AFTER FILTER BY FREQUENCE RDF = self.filter_applier.rdf_add_special_token(RDF) # WARNING, THIS MUST BE DONE AFTER FILTER BY FREQUENCE
@ -119,9 +116,13 @@ class Pipeline():
movie_list = [117248, 147074, 113621, 1123, 117586, 90177, 71587, 138952, 144137, 148025] movie_list = [117248, 147074, 113621, 1123, 117586, 90177, 71587, 138952, 144137, 148025]
self.sql_endpoint.movie_ids = movie_list self.sql_endpoint.movie_ids = movie_list
def reduce_movie_list(self, starting_offset:int , ending_offset:int): def generate_csv_debug_file(self, debug_path:str):
self.filter_applier.reduce_movie_list(starting_offset,ending_offset) debug_csv = Debug_csv(debug_path)
for RDF in self._get_cleaned_movie_rows():
debug_csv.write(RDF)
debug_csv.close()
# there are a lot of settings to manage # there are a lot of settings to manage
@ -132,9 +133,10 @@ class Pipeline():
#pipeline = Pipeline() #pipeline = Pipeline()
# pipeline.use_toy_dataset() pipeline.use_toy_dataset()
# pipeline.execute_task_bpe_corpus() # pipeline.execute_task_bpe_corpus()
# pipeline.execute_task_rdf_mask() # pipeline.execute_task_rdf_mask()
# pipeline.execute_tasks_rdf_text() # pipeline.execute_tasks_rdf_text()
# pipeline.execute_task_rdf_completation() # pipeline.execute_task_rdf_completation()
# pipeline.execute_all_task() # pipeline.execute_all_task()
pipeline.generate_csv_debug_file("Assets/Dataset/Tmp/debug.csv")