From bd72ad3571bf2710cd154c5cf08b448dc194f13d Mon Sep 17 00:00:00 2001 From: GassiGiuseppe Date: Mon, 29 Sep 2025 15:21:26 +0200 Subject: [PATCH] Added file to execute the complete cleaning pipeline --- .../data_output_models/bpe_corpus.py | 21 ++ .../rdf_completation_task.py | 26 +++ .../data_output_models/rdf_mask_task.py | 58 ++++++ .../data_output_models/rdf_text_tasks.py | 26 +++ Scripts/DataCleaning/filter.py | 184 ++++++++++++++++++ Scripts/DataCleaning/pipeline.py | 107 ++++++++++ .../Libs/CleaningPipeline/special_token.py | 21 ++ Scripts/Libs/CleaningPipeline/sql_endpoint.py | 144 ++++++++++++++ Scripts/Libs/Utils/dataframe_interaction.py | 9 + 9 files changed, 596 insertions(+) create mode 100644 Scripts/DataCleaning/data_output_models/bpe_corpus.py create mode 100644 Scripts/DataCleaning/data_output_models/rdf_completation_task.py create mode 100644 Scripts/DataCleaning/data_output_models/rdf_mask_task.py create mode 100644 Scripts/DataCleaning/data_output_models/rdf_text_tasks.py create mode 100644 Scripts/DataCleaning/filter.py create mode 100644 Scripts/DataCleaning/pipeline.py create mode 100644 Scripts/Libs/CleaningPipeline/special_token.py create mode 100644 Scripts/Libs/CleaningPipeline/sql_endpoint.py create mode 100644 Scripts/Libs/Utils/dataframe_interaction.py diff --git a/Scripts/DataCleaning/data_output_models/bpe_corpus.py b/Scripts/DataCleaning/data_output_models/bpe_corpus.py new file mode 100644 index 0000000..a0348b6 --- /dev/null +++ b/Scripts/DataCleaning/data_output_models/bpe_corpus.py @@ -0,0 +1,21 @@ +from Scripts.Libs.Utils.dataframe_interaction import get_raw_from_dataframe +from Scripts.Libs.CleaningPipeline.special_token import SpecialToken +import pandas as pd + +class BPE_corpus(): + + def __init__(self, output_path :str): + self.output_handler = open(output_path, "w") + + def close(self): + # add corpus end before closing + self.output_handler.write(SpecialToken.CORPUS_END.value) + self.output_handler.close() + + def write_from_str(self, output: str): + if output == '': + return + self.output_handler.write(output) + + def write_from_df(self, df: pd.DataFrame): + self.write_from_str(get_raw_from_dataframe(df)) \ No newline at end of file diff --git a/Scripts/DataCleaning/data_output_models/rdf_completation_task.py b/Scripts/DataCleaning/data_output_models/rdf_completation_task.py new file mode 100644 index 0000000..111b2b9 --- /dev/null +++ b/Scripts/DataCleaning/data_output_models/rdf_completation_task.py @@ -0,0 +1,26 @@ +import pandas as pd + +class RDF_completation_task_dataset(): + """ + Write the CSV for the fourth task, which is "Predicting subsequent triples based on a given context". + Each RDF is saved as str + CSV Composition: ["MovieID","RDF"] + """ + def __init__(self, output_path:str): + + + self.output = open(output_path, "w") + # then the first row as header + header = ["MovieID","RDF"] + self.output.write(",".join(header) + "\n") + + def close(self): + self.output.close() + + def write(self, RDF: pd.DataFrame): + """ + Args: + RDF (pd.DataFrame): ["MovieID","RDF"] + """ + + RDF.to_csv(self.output, index=False, header=False) \ No newline at end of file diff --git a/Scripts/DataCleaning/data_output_models/rdf_mask_task.py b/Scripts/DataCleaning/data_output_models/rdf_mask_task.py new file mode 100644 index 0000000..01b943d --- /dev/null +++ b/Scripts/DataCleaning/data_output_models/rdf_mask_task.py @@ -0,0 +1,58 @@ +import pandas as pd + +# do not worry about circular dependencies, this class will never call something else +from Scripts.DataCleaning.filter import PipelineApplier + +class RDF_mask_task_dataset(): + """ + Write the CSV for the third task, which is "Predicting a masked component within an RDF triple". + The CSV is like: for each RDF there will be 3 rows, where every time one of the componments is missing. + CSV Composition: ["MovieID","IncompleteRDF","Missing","RDF"] + """ + def __init__(self, output_path:str): + + # this methods will only be used by this class, but they belong in a lower level + self._build_triple = PipelineApplier.build_triple + self._build_incomplete_triple = PipelineApplier.build_incomplete_triple + + self.output = open(output_path, "w") + # then the first row as header + header = ["MovieID","IncompleteRDF","Missing","RDF"] + self.output.write(",".join(header) + "\n") + + def close(self): + self.output.close() + + def write(self, RDF: pd.DataFrame): + rdf_complete = self._build_triple(RDF) + + rdf_without_subject = self._build_incomplete_triple(RDF.drop(columns=["SubjectURI"])) + rdf_without_relationship = self._build_incomplete_triple(RDF.drop(columns=["RelationshipURI"])) + rdf_without_object = self._build_incomplete_triple(RDF.drop(columns=["ObjectURI"])) + #### + df_subject = pd.DataFrame({ + "MovieID": RDF["MovieID"], + "IncompleteRDF": rdf_without_subject, + "Missing": RDF["SubjectURI"], + "RDF": rdf_complete, + }) + + df_relationship = pd.DataFrame({ + "MovieID": RDF["MovieID"], + "IncompleteRDF": rdf_without_relationship, + "Missing": RDF["RelationshipURI"], + "RDF": rdf_complete, + }) + + df_object = pd.DataFrame({ + "MovieID": RDF["MovieID"], + "IncompleteRDF": rdf_without_object, + "Missing": RDF["ObjectURI"], + "RDF": rdf_complete, + }) + + + output_df = pd.concat([df_subject, df_relationship, df_object], ignore_index=True) + output_df.to_csv(self.output, index=False, header=False) + + diff --git a/Scripts/DataCleaning/data_output_models/rdf_text_tasks.py b/Scripts/DataCleaning/data_output_models/rdf_text_tasks.py new file mode 100644 index 0000000..918e600 --- /dev/null +++ b/Scripts/DataCleaning/data_output_models/rdf_text_tasks.py @@ -0,0 +1,26 @@ +import pandas as pd + +class RDF_text_task_dataset(): + """ + Write the CSV for the firsts two tasks, which are "Generating structured RDF triples from natural language text" and reverse. + In the CVS the RDFs will be saved toghether as a string. + CSV Composition: ["MovieID","RDFs","Abstract"] + """ + def __init__(self, output_path:str): + + + self.output = open(output_path, "w") + # then the first row as header + header = ["MovieID","RDFs","Abstract"] + self.output.write(",".join(header) + "\n") + + def close(self): + self.output.close() + + def write(self, RDF: pd.DataFrame): + """ + Args: + RDF (pd.DataFrame): ["MovieID","Triple","Abstract"] + """ + + RDF.to_csv(self.output, index=False, header=False) \ No newline at end of file diff --git a/Scripts/DataCleaning/filter.py b/Scripts/DataCleaning/filter.py new file mode 100644 index 0000000..50d6ead --- /dev/null +++ b/Scripts/DataCleaning/filter.py @@ -0,0 +1,184 @@ +# This file deletes in the pipeline the unwanted relationship by different rules +import pandas as pd +import sqlite3 +import numpy as np + +from Scripts.Libs.CleaningPipeline.special_token import SpecialToken +from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint + + +class PipelineApplier(): + + def __init__(self): + + self.MOVIE_FILTER = pd.DataFrame() + self.REL_FILTER = pd.DataFrame() + + + def delete_relationship_by_str(self, RDF: pd.DataFrame, uri: str) -> pd.DataFrame: + return RDF[RDF["RelationshipURI"]!= uri] + + def generate_list_relationship_filter(self, filter_list: list[str]) -> None: + """Store RelationshipURI filters as a set """ + self.relationship_filter_list: set[str] = set(filter_list) + + def delete_relationship_by_list_filter(self, RDF: pd.DataFrame) -> pd.DataFrame: + """Remove rows whose RelationshipURI is in the stored filter. Generate it first callig the generate_list_relationship_filter""" + return RDF[~RDF["RelationshipURI"].isin(self.relationship_filter_list)] + + + def generate_frequency_movie_filter(self, MOVIE_COUNT: pd.DataFrame ,min_treshold: int, max_treshold: int): + """ + You MUST call this before filter the dataset by movie frequence [filter_by_frequence_movie_id()], + since this method creates such filter + Args: + MOVIE_COUNT (pd.DataFrame): ["MovieID","Count"] + min_treshold (int): + max_treshold (int): + """ + MOVIE_COUNT = MOVIE_COUNT[MOVIE_COUNT["Count"] >= min_treshold] + MOVIE_COUNT = MOVIE_COUNT[MOVIE_COUNT["Count"] < max_treshold] + self.MOVIE_FILTER = MOVIE_COUNT #["MovieID"] + + def generate_frequency_relationship_filter(self, REL_COUNT: pd.DataFrame ,min_treshold: int, max_treshold: int): + REL_COUNT = REL_COUNT[REL_COUNT["Count"] >= min_treshold] + REL_COUNT = REL_COUNT[REL_COUNT["Count"] < max_treshold] + self.REL_FILTER = REL_COUNT #["RelationshipURI"] + + def filter_by_frequency_movie_id(self, RDF: pd.DataFrame) -> pd.DataFrame: + RDF = RDF[RDF["MovieID"].isin(self.MOVIE_FILTER["MovieID"])] + return RDF + + def filter_by_frequency_relationship(self, RDF: pd.DataFrame) -> pd.DataFrame: + RDF = RDF[RDF["RelationshipURI"].isin(self.REL_FILTER["RelationshipURI"])] + return RDF + + def rdf_add_special_token(self, RDF: pd.DataFrame): + """ + Adds RDF special token to each element of the tuple. i.e: SUBJ to SubjectURI, OBJ to ObjectURI, REL to RelationshipURI. + Check Scrits/Libs/CleaningPipeline/special_token.py for the up-to-date special token. + It only adds the special token of the three element of the RDF, no other special token. + Args: + RDF (pd.DataFrame): + Returns: + pd.DataFrame: ["MovieURI","SubjectURI","RelationshipURI","ObjectURI","Abstract"] + """ + # if the filter runned before sliced the RDF and created a View, here the problem is resolved + # for more context: SettingWithCopyWarning + RDF = RDF.copy() + # at the beginning of SubjectURI RelationshipURI ObjectURI, add their special token + RDF["SubjectURI"] = SpecialToken.SUBJECT.value + RDF["SubjectURI"] + RDF["ObjectURI"] = SpecialToken.OBJECT.value + RDF["ObjectURI"] + RDF["RelationshipURI"] = SpecialToken.RELATIONSHIP.value + RDF["RelationshipURI"] + return RDF + + + def drop_na_from_dataset(self, RDF: pd.DataFrame) -> pd.DataFrame: + # dataset has SubjectURI RelationshipURI ObjectURI + # want to drop the '' in them + # Replace empty strings with NaN + RDF = RDF.replace('', np.nan) + # Drop rows where any of the key columns are NaN + RDF = RDF.dropna(subset=["SubjectURI", "RelationshipURI", "ObjectURI"]) + return RDF + + def rebuild_by_movie(self, RDF: pd.DataFrame) -> pd.DataFrame: + """_summary_ + + Args: + RDF (pd.DataFrame): ["MovieID","SubjectURI","RelationshipURI","ObjectURI","Abstract"] + + Returns: + pd.DataFrame: ["MovieID","Triple","Abstract"] + """ + # to execute this method you have to have itereted by movie_id + # because as design we want at the end one row for each movie + # MovieID and abstract can be given as input for a more generic method + # movie_id = RDF["MovieID"].iloc(0) + # abstract = RDF["Abstract"].iloc(0) + # first let's combine each row creating column triple as join of rdf + RDF["Triple"] = RDF["SubjectURI"] + RDF["RelationshipURI"] + RDF["ObjectURI"] + # special token + RDF["Triple"] = SpecialToken.START_TRIPLE.value + RDF["Triple"] + SpecialToken.END_TRIPLE.value + # combine rows into one + # MovieID and Abstract are unique for each other 1 <-> 1 + RDF = RDF.groupby(["MovieID", "Abstract"])["Triple"].apply("".join).reset_index() + # add special token for: start of triple, end of triple and start of abstract + RDF["Triple"] = SpecialToken.START_TRIPLE_LIST.value + RDF["Triple"] + RDF["Abstract"] = SpecialToken.ABSTRACT.value + RDF["Abstract"] + return RDF[["MovieID","Triple","Abstract"]] + + def group_by_movie_from_triple(self, RDF: pd.DataFrame) -> pd.DataFrame: + """ + Args: + RDF (pd.DataFrame): ["MovieID","Triple","Abstract"] + + Returns: + pd.DataFrame: ["MovieID","Triple","Abstract"] + """ + # combine rows into one + # MovieID and Abstract are unique for each other 1 <-> 1 + RDF = RDF.groupby(["MovieID", "Abstract"])["Triple"].apply("".join).reset_index() + # add special token for: start of triple, end of triple and start of abstract + RDF["Triple"] = SpecialToken.START_TRIPLE_LIST.value + RDF["Triple"] + RDF["Abstract"] = SpecialToken.ABSTRACT.value + RDF["Abstract"] + return RDF[["MovieID","Triple","Abstract"]] + + + @staticmethod + def build_triple(RDF: pd.DataFrame): + """ + Obtains joined RDF triple in one element, togheter with START and END special token + Args: + RDF (pd.DataFrame): at least ["SubjectURI", "RelationshipURI", "ObjectURI"] + Returns: + pd.DataFrame: RDF["Triple"] (just this column) + """ + # let's combine each row creating column triple as join of rdf + RDF["Triple"] = RDF["SubjectURI"] + RDF["RelationshipURI"] + RDF["ObjectURI"] + # special token + RDF["Triple"] = SpecialToken.START_TRIPLE.value + RDF["Triple"] + SpecialToken.END_TRIPLE.value + return RDF["Triple"] + + @staticmethod + def build_incomplete_triple(RDF: pd.DataFrame): + """ + Method helper used for the third task: "Predicting a masked component within an RDF triple". + Obtains joined RDF triple in one element, togheter with START and END special token. + The MISSING element will be replaced by the special token + Args: + RDF (pd.DataFrame): 2 of the following ["SubjectURI", "RelationshipURI", "ObjectURI"] + Returns: + RDF["Triple"]: pd.Series (just this column, NOT A DATAFRAME) + """ + # let's create a new column "Triple" with the joined RDF + + # the following creates a column of MASK token of the lenght of the dataframe, + # it is not needed since we expect to have a dataframe of just one column, but its more robust (AND SLOW) + MISSING = pd.Series([SpecialToken.MASK.value] * len(RDF), index=RDF.index) + + RDF["Triple"] = ( + RDF.get("SubjectURI", MISSING) + + RDF.get("RelationshipURI", MISSING) + + RDF.get("ObjectURI", MISSING)) + # special token + RDF["Triple"] = SpecialToken.START_TRIPLE.value + RDF["Triple"] + SpecialToken.END_TRIPLE.value + return RDF["Triple"] + + @staticmethod + def build_for_mask_task(RDF_incomplete: pd.DataFrame, MISSING: pd.DataFrame) -> pd.DataFrame: + # currently not used + """ + Method helper used for the third task: "Predicting a masked component within an RDF triple". + Given two Dataframe, the first containing the incompleted RDF and the other only the missing componment, + this methods applies the special token + Args: + RDF (pd.DataFrame): _description_ + + Returns: + pd.DataFrame: _description_ + """ + # take an example dataframe as ["SubjectURI",""] + # as input two dataframe, one with 2 column + return None + diff --git a/Scripts/DataCleaning/pipeline.py b/Scripts/DataCleaning/pipeline.py new file mode 100644 index 0000000..e07294b --- /dev/null +++ b/Scripts/DataCleaning/pipeline.py @@ -0,0 +1,107 @@ +import re +from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint +from Scripts.DataCleaning.filter import PipelineApplier +# tasks dataset builder +from Scripts.DataCleaning.data_output_models.rdf_mask_task import RDF_mask_task_dataset +from Scripts.DataCleaning.data_output_models.bpe_corpus import BPE_corpus +from Scripts.DataCleaning.data_output_models.rdf_text_tasks import RDF_text_task_dataset +from Scripts.DataCleaning.data_output_models.rdf_completation_task import RDF_completation_task_dataset + +import pandas as pd + +class Pipeline(): + def __init__(self, output): + self.sql_endpoint = SqlEndpoint() + # classes to manage taskes' datasets + self.task_rdf_mask = RDF_mask_task_dataset("./Assets/Dataset/Tmp/debug.csv") + self.task_bpe_corpus = BPE_corpus("./Assets/Dataset/Tmp/output.txt") + self.task_rdf_text = RDF_text_task_dataset("./Assets/Dataset/Tmp/rdf_text.csv") + self.task_rdf_completation = RDF_completation_task_dataset("./Assets/Dataset/Tmp/rdf_completation.csv") + + # prepare the filter + # the filter applier needs to know thefrequence of Movies and Relationship among all the Dataset + self.filter_applier = PipelineApplier() + MOVIE_COUNT = self.sql_endpoint.get_movies_id_count() + REL_COUNT = self.sql_endpoint.get_relationship_count() + self.filter_applier.generate_frequency_movie_filter(MOVIE_COUNT,50,3000) + self.filter_applier.generate_frequency_relationship_filter(REL_COUNT, 50, 2395627) + # prepare the filter ot the relationshipURI you want to delete: + relationship_uri_banned_list = [ + "dbp-dbp:wikiPageUsesTemplate","w3:2000/01/rdf-schema#label","dbp-dbo:abstract", + "dbp-dbo:wikiPageID","dbp-dbo:wikiPageRevisionID", "dbp-dbo:wikiPageDisambiguates", + "w3:2002/07/owl#sameAs","dbp-dbp:image","dbp-dbo:wikiPageLength", "w3:2000/01/rdf-schema#comment", + "dbp-dbo:thumbnail", "foaf:depiction", "w3:1999/02/22-rdf-syntax-ns#type"] + self.filter_applier.generate_list_relationship_filter(relationship_uri_banned_list) + + + def _end_file_handler(self): + self.task_bpe_corpus.close() + self.task_rdf_mask.close() + self.task_rdf_text.close() + self.task_rdf_completation.close() + + def _get_cleaned_movie_rows(self): + for RDF in self.sql_endpoint.get_abbreviated_dataset_by_movie_id(): + RDF = self.filter_applier.drop_na_from_dataset(RDF) + RDF = self.filter_applier.filter_by_frequency_movie_id(RDF) + RDF = self.filter_applier.filter_by_frequency_relationship(RDF) + # other filter + # + RDF = self.filter_applier.delete_relationship_by_list_filter(RDF) + if RDF.empty: + continue + RDF = self.filter_applier.rdf_add_special_token(RDF) # WARNING, THIS MUST BE DONE AFTER FILTER BY FREQUENCE + yield RDF + + def execute_task_bpe_corpus(self): + for RDF in self._get_cleaned_movie_rows(): + RDF = self.filter_applier.rebuild_by_movie(RDF) + RDF = RDF[["Triple","Abstract"]] + self.task_bpe_corpus.write_from_df(RDF) + self._end_file_handler() + + + def execute_task_rdf_mask(self): + for RDF in self._get_cleaned_movie_rows(): + self.task_rdf_mask.write(RDF) + self._end_file_handler() + + def execute_tasks_rdf_text(self): + for RDF in self._get_cleaned_movie_rows(): + RDF = self.filter_applier.rebuild_by_movie(RDF) + self.task_rdf_text.write(RDF) + self._end_file_handler() + + def execute_task_rdf_completation(self): + for RDF in self._get_cleaned_movie_rows(): + RDF["Triple"] = self.filter_applier.build_triple(RDF) + self.task_rdf_completation.write(RDF[["MovieID","Triple"]]) + self._end_file_handler() + + + def execute_all_task(self): + for RDF in self._get_cleaned_movie_rows(): + self.task_rdf_mask.write(RDF) + + RDF["Triple"] = self.filter_applier.build_triple(RDF) + self.task_rdf_completation.write(RDF[["MovieID","Triple"]]) + + RDF = self.filter_applier.group_by_movie_from_triple(RDF[["MovieID","Triple","Abstract"]]) + + self.task_rdf_text.write(RDF) + self.task_bpe_corpus.write_from_df(RDF[["Triple","Abstract"]]) + + self._end_file_handler() + + + + + + + +pipeline = Pipeline("./Assets/Dataset/Tmp/output.txt") +# pipeline.execute_task_bpe_corpus() +# pipeline.execute_task_rdf_mask() +# pipeline.execute_tasks_rdf_text() +# pipeline.execute_task_rdf_completation() +pipeline.execute_all_task() \ No newline at end of file diff --git a/Scripts/Libs/CleaningPipeline/special_token.py b/Scripts/Libs/CleaningPipeline/special_token.py new file mode 100644 index 0000000..644ad71 --- /dev/null +++ b/Scripts/Libs/CleaningPipeline/special_token.py @@ -0,0 +1,21 @@ +from enum import Enum + +class SpecialToken(str, Enum): + # (Enum, str) -> throws an error + START_TRIPLE_LIST = "" + START_TRIPLE = "" + END_TRIPLE = "" + SUBJECT = "" + RELATIONSHIP = "" + OBJECT = "" + ABSTRACT = "" + CORPUS_END = "" + + ## Tasks' Token + RDF_TO_TEXT = "" + TEXT_TO_RDF = "" + CONTINUE_RDF = "" + MASK = "" + + #BPE Training: + \ No newline at end of file diff --git a/Scripts/Libs/CleaningPipeline/sql_endpoint.py b/Scripts/Libs/CleaningPipeline/sql_endpoint.py new file mode 100644 index 0000000..4e43528 --- /dev/null +++ b/Scripts/Libs/CleaningPipeline/sql_endpoint.py @@ -0,0 +1,144 @@ +####################################################### +# This file stand as endpoint to interact with DB # +####################################################### + +# import sqlite3 +import pandas as pd +from sqlalchemy import create_engine +from Scripts.Libs.CleaningPipeline.special_token import SpecialToken + + +class SqlEndpoint(): + + def __init__(self, DB_PATH = "./Assets/Dataset/DatawareHouse/dataset.db", chunk_size_row = 500): + # self.CONN = sqlite3.connect(DB_PATH) # DEPRECATED + self.sql_engine = create_engine(f"sqlite:///{DB_PATH}") + # /// 3 slash -> relative path + # //// 4 slash -> absolute + # self.conn = self.sql_engine.connect().execution_options(stream_results=True) + # it seems that sqlite doenst support streamer cursor + # PRAGMA exeutes better in writing not reading + self.chunk_size_row = chunk_size_row + pass + + def get_RDF(self) -> pd.DataFrame : + + QUERY = """ + SELECT MovieID, SubjectURI, RelationshipURI, ObjectURI + FROM RDFs + INNER JOIN Subjects USING (SubjectID) + INNER JOIN Relationships USING (RelationshipID) + INNER JOIN Objects USING (ObjectID); + """ + + return pd.read_sql_query(QUERY, self.CONN) + + def get_chunked_abbreviated_dataset(self) -> pd.DataFrame : + """ + Returns: + pd.DataFrame: MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract + """ + + QUERY = """ + SELECT MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract + FROM RDFs + INNER JOIN ParsedSubjects USING (SubjectID) + INNER JOIN ParsedRelationships USING (RelationshipID) + INNER JOIN ParsedObjects USING (ObjectID) + INNER JOIN WikipediaAbstracts USING (MovieID); + """ + + # return pd.read_sql_query(QUERY, self.CONN, chunksize=500) + # sqlite3 + return pd.read_sql_query(QUERY, self.sql_engine, chunksize=self.chunk_size_row) + + + def get_chunked_abbreviated_dataset_with_start_token(self)-> pd.DataFrame: + # DEPRECATED ! + start_token = SpecialToken() + QUERY = """ + SELECT + MovieID, + ? || SubjectURI AS SubjectURI, + ? || RelationshipURI AS RelationshipURI, + ? || ObjectURI AS ObjectURI, + Abstract + FROM RDFs + INNER JOIN ParsedSubjects USING (SubjectID) + INNER JOIN ParsedRelationships USING (RelationshipID) + INNER JOIN ParsedObjects USING (ObjectID) + INNER JOIN WikipediaAbstracts USING (MovieID); + """ + return pd.read_sql_query(QUERY, self.sql_engine, chunksize=self.chunk_size_row) + + def get_abbreviated_dataset_by_movie_id(self):# -> iter[pd.DataFrame]: + """ + Gets each time a DataFrame per movie ( with all its rows in the dataset). + The retrieved RDFs are already abbrevieted by the sql parser + Yields: + Pandas.DataFrame: [MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract] + """ + # chunk by movieId, abstract is the same and some intersting logic are appliable + movie_ids = pd.read_sql_query("SELECT MovieID FROM Movies;", self.sql_engine)["MovieID"] + # CHOOSEN MOVIE: + # The Dark Knight : 117248 + # Inception : 147074 + # The Avengers : 113621 + # Cast Away : 1123 + # The Departed : 117586 + # American Psycho : 90177 + # Avatar : 71587 + # Django Unchained : 138952 + # Spirited Away : 144137 + # Knives Out : 148025 + movie_list = [117248, 147074, 113621, 1123, 117586, 90177, 71587, 138952, 144137, 148025] + movie_ids = movie_list + + QUERY = """ + SELECT MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract + FROM RDFs + INNER JOIN ParsedSubjects USING (SubjectID) + INNER JOIN ParsedRelationships USING (RelationshipID) + INNER JOIN ParsedObjects USING (ObjectID) + INNER JOIN WikipediaAbstracts USING (MovieID) + WHERE MovieID = (?); + """ + + for movie_id in movie_ids: + yield pd.read_sql_query(QUERY, self.sql_engine, params=(movie_id,)) + + def get_movies_id_count(self) -> pd.DataFrame: + """ + Gets the count of each Movie in the Dataset + Returns: + Pandas.DataFrame: [MovieID, Count] + """ + QUERY = """ + SELECT MovieID, COUNT(*) AS Count + FROM RDFs + GROUP BY MovieID; + """ + return pd.read_sql_query(QUERY, self.sql_engine) + + def get_relationship_count(self) -> pd.DataFrame: + """ + Gets the count of each Relationship in the Dataset + Returns: + Pandas.DataFrame: [RelationshipURI, Count] + """ + QUERY = """ + SELECT RelationshipURI, COUNT(*) AS Count + FROM RDFs + INNER JOIN ParsedRelationships USING (RelationshipID) + GROUP BY RelationshipURI; + """ + return pd.read_sql_query(QUERY, self.sql_engine) + + + +if __name__ == "__main__" : + sql_endpoint = SqlEndpoint() + for pandas_row in sql_endpoint.get_abbreviated_dataset_by_movie_id(): + print(pandas_row) + # sql_endpoint.get_RDF() + print("done") \ No newline at end of file diff --git a/Scripts/Libs/Utils/dataframe_interaction.py b/Scripts/Libs/Utils/dataframe_interaction.py new file mode 100644 index 0000000..c4df33a --- /dev/null +++ b/Scripts/Libs/Utils/dataframe_interaction.py @@ -0,0 +1,9 @@ +import pandas as pd + + + +def get_raw_from_dataframe(DF: pd.DataFrame) -> str: + output = '' + for row in DF.itertuples(index=False, name=None): + output += "".join(map(str, row)) + return output