From 8167c9d435b15a4f189d57b6644a376fed2f2e2c Mon Sep 17 00:00:00 2001 From: GassiGiuseppe Date: Mon, 29 Sep 2025 16:03:49 +0200 Subject: [PATCH] Added Toy Dataset entry point into the Pipeline class Before it was forced into the sql_endpoint, now all the pipeline can be managed in the Pipeline class --- Scripts/DataCleaning/pipeline.py | 78 ++++++++++++------- Scripts/Libs/CleaningPipeline/sql_endpoint.py | 12 +-- 2 files changed, 57 insertions(+), 33 deletions(-) diff --git a/Scripts/DataCleaning/pipeline.py b/Scripts/DataCleaning/pipeline.py index e07294b..eb5b2f7 100644 --- a/Scripts/DataCleaning/pipeline.py +++ b/Scripts/DataCleaning/pipeline.py @@ -10,22 +10,22 @@ from Scripts.DataCleaning.data_output_models.rdf_completation_task import RDF_co import pandas as pd class Pipeline(): - def __init__(self, output): + def __init__(self): self.sql_endpoint = SqlEndpoint() # classes to manage taskes' datasets - self.task_rdf_mask = RDF_mask_task_dataset("./Assets/Dataset/Tmp/debug.csv") - self.task_bpe_corpus = BPE_corpus("./Assets/Dataset/Tmp/output.txt") + self.task_rdf_mask = RDF_mask_task_dataset("./Assets/Dataset/Tmp/rdf_mask.csv") + self.task_bpe_corpus = BPE_corpus("./Assets/Dataset/Tmp/corpus.txt") self.task_rdf_text = RDF_text_task_dataset("./Assets/Dataset/Tmp/rdf_text.csv") self.task_rdf_completation = RDF_completation_task_dataset("./Assets/Dataset/Tmp/rdf_completation.csv") # prepare the filter - # the filter applier needs to know thefrequence of Movies and Relationship among all the Dataset + # the filter applier needs to know the frequence of Movies and Relationship among all the Dataset self.filter_applier = PipelineApplier() MOVIE_COUNT = self.sql_endpoint.get_movies_id_count() REL_COUNT = self.sql_endpoint.get_relationship_count() self.filter_applier.generate_frequency_movie_filter(MOVIE_COUNT,50,3000) self.filter_applier.generate_frequency_relationship_filter(REL_COUNT, 50, 2395627) - # prepare the filter ot the relationshipURI you want to delete: + # prepare the filter on the relationshipURI you want to delete: relationship_uri_banned_list = [ "dbp-dbp:wikiPageUsesTemplate","w3:2000/01/rdf-schema#label","dbp-dbo:abstract", "dbp-dbo:wikiPageID","dbp-dbo:wikiPageRevisionID", "dbp-dbo:wikiPageDisambiguates", @@ -34,25 +34,6 @@ class Pipeline(): self.filter_applier.generate_list_relationship_filter(relationship_uri_banned_list) - def _end_file_handler(self): - self.task_bpe_corpus.close() - self.task_rdf_mask.close() - self.task_rdf_text.close() - self.task_rdf_completation.close() - - def _get_cleaned_movie_rows(self): - for RDF in self.sql_endpoint.get_abbreviated_dataset_by_movie_id(): - RDF = self.filter_applier.drop_na_from_dataset(RDF) - RDF = self.filter_applier.filter_by_frequency_movie_id(RDF) - RDF = self.filter_applier.filter_by_frequency_relationship(RDF) - # other filter - # - RDF = self.filter_applier.delete_relationship_by_list_filter(RDF) - if RDF.empty: - continue - RDF = self.filter_applier.rdf_add_special_token(RDF) # WARNING, THIS MUST BE DONE AFTER FILTER BY FREQUENCE - yield RDF - def execute_task_bpe_corpus(self): for RDF in self._get_cleaned_movie_rows(): RDF = self.filter_applier.rebuild_by_movie(RDF) @@ -66,12 +47,14 @@ class Pipeline(): self.task_rdf_mask.write(RDF) self._end_file_handler() + def execute_tasks_rdf_text(self): for RDF in self._get_cleaned_movie_rows(): RDF = self.filter_applier.rebuild_by_movie(RDF) self.task_rdf_text.write(RDF) self._end_file_handler() + def execute_task_rdf_completation(self): for RDF in self._get_cleaned_movie_rows(): RDF["Triple"] = self.filter_applier.build_triple(RDF) @@ -92,14 +75,55 @@ class Pipeline(): self.task_bpe_corpus.write_from_df(RDF[["Triple","Abstract"]]) self._end_file_handler() - + + + def _end_file_handler(self): + self.task_bpe_corpus.close() + self.task_rdf_mask.close() + self.task_rdf_text.close() + self.task_rdf_completation.close() + + + def _get_cleaned_movie_rows(self): + for RDF in self.sql_endpoint.get_abbreviated_dataset_by_movie_id(): + RDF = self.filter_applier.drop_na_from_dataset(RDF) + RDF = self.filter_applier.filter_by_frequency_movie_id(RDF) + RDF = self.filter_applier.filter_by_frequency_relationship(RDF) + # other filter + # + RDF = self.filter_applier.delete_relationship_by_list_filter(RDF) + if RDF.empty: + continue + RDF = self.filter_applier.rdf_add_special_token(RDF) # WARNING, THIS MUST BE DONE AFTER FILTER BY FREQUENCE + yield RDF + + + def use_toy_dataset(self): + # CHOOSEN MOVIE: + # The Dark Knight : 117248 + # Inception : 147074 + # The Avengers : 113621 + # Cast Away : 1123 + # The Departed : 117586 + # American Psycho : 90177 + # Avatar : 71587 + # Django Unchained : 138952 + # Spirited Away : 144137 + # Knives Out : 148025 + movie_list = [117248, 147074, 113621, 1123, 117586, 90177, 71587, 138952, 144137, 148025] + self.sql_endpoint.movie_ids = movie_list +# there are a lot of settings to manage +# you only need to change settings: +# in the init for file paths, frequency filter limit, banned reletionshipURI +# in the use_toy_dataset , to change the toy dataset +# in _get_cleaned_movie_rows: to change how the pipeline behave +pipeline = Pipeline() - -pipeline = Pipeline("./Assets/Dataset/Tmp/output.txt") +# pipeline.use_toy_dataset() # pipeline.execute_task_bpe_corpus() # pipeline.execute_task_rdf_mask() # pipeline.execute_tasks_rdf_text() diff --git a/Scripts/Libs/CleaningPipeline/sql_endpoint.py b/Scripts/Libs/CleaningPipeline/sql_endpoint.py index 4e43528..66ba1ea 100644 --- a/Scripts/Libs/CleaningPipeline/sql_endpoint.py +++ b/Scripts/Libs/CleaningPipeline/sql_endpoint.py @@ -18,8 +18,8 @@ class SqlEndpoint(): # self.conn = self.sql_engine.connect().execution_options(stream_results=True) # it seems that sqlite doenst support streamer cursor # PRAGMA exeutes better in writing not reading - self.chunk_size_row = chunk_size_row - pass + self.chunk_size_row = chunk_size_row # not used now, since each chunk is a movie + self.movie_ids = movie_ids = pd.read_sql_query("SELECT MovieID FROM Movies;", self.sql_engine)["MovieID"] def get_RDF(self) -> pd.DataFrame : @@ -79,7 +79,7 @@ class SqlEndpoint(): Pandas.DataFrame: [MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract] """ # chunk by movieId, abstract is the same and some intersting logic are appliable - movie_ids = pd.read_sql_query("SELECT MovieID FROM Movies;", self.sql_engine)["MovieID"] + # movie_ids = pd.read_sql_query("SELECT MovieID FROM Movies;", self.sql_engine)["MovieID"] # CHOOSEN MOVIE: # The Dark Knight : 117248 # Inception : 147074 @@ -91,8 +91,8 @@ class SqlEndpoint(): # Django Unchained : 138952 # Spirited Away : 144137 # Knives Out : 148025 - movie_list = [117248, 147074, 113621, 1123, 117586, 90177, 71587, 138952, 144137, 148025] - movie_ids = movie_list + # movie_list = [117248, 147074, 113621, 1123, 117586, 90177, 71587, 138952, 144137, 148025] + # movie_ids = movie_list QUERY = """ SELECT MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract @@ -104,7 +104,7 @@ class SqlEndpoint(): WHERE MovieID = (?); """ - for movie_id in movie_ids: + for movie_id in self.movie_ids: yield pd.read_sql_query(QUERY, self.sql_engine, params=(movie_id,)) def get_movies_id_count(self) -> pd.DataFrame: