diff --git a/Scripts/DataCleaning/pipeline/cleaner.py b/Scripts/DataCleaning/pipeline/cleaner.py new file mode 100644 index 0000000..ff0dc14 --- /dev/null +++ b/Scripts/DataCleaning/pipeline/cleaner.py @@ -0,0 +1,86 @@ +# This file deletes in the pipeline the unwanted relationship by different rules +import pandas as pd +import sqlite3 +import numpy as np + +from Scripts.Libs.CleaningPipeline.special_token import SpecialToken +from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint + + +class PipelineApplier(): + + def __init__(self): + pass + + def rdf_add_special_token(self, RDF: pd.DataFrame): + """ + Adds RDF special token to each element of the tuple. i.e: SUBJ to SubjectURI, OBJ to ObjectURI, REL to RelationshipURI. + Check Scrits/Libs/CleaningPipeline/special_token.py for the up-to-date special token. + It only adds the special token of the three element of the RDF, no other special token. + Args: + RDF (pd.DataFrame): + Returns: + pd.DataFrame: ["MovieURI","SubjectURI","RelationshipURI","ObjectURI","Abstract"] + """ + # if the filter runned before sliced the RDF and created a View, here the problem is resolved + # for more context: SettingWithCopyWarning + RDF = RDF.copy() + # at the beginning of SubjectURI RelationshipURI ObjectURI, add their special token + RDF["SubjectURI"] = SpecialToken.SUBJECT.value + RDF["SubjectURI"] + RDF["ObjectURI"] = SpecialToken.OBJECT.value + RDF["ObjectURI"] + RDF["RelationshipURI"] = SpecialToken.RELATIONSHIP.value + RDF["RelationshipURI"] + return RDF + + + def drop_na_from_dataset(self, RDF: pd.DataFrame) -> pd.DataFrame: + RDF = RDF.replace('', np.nan) + # Drop rows where any of the key columns are NaN + RDF = RDF.dropna(subset=["SubjectURI", "RelationshipURI", "ObjectURI"]) + return RDF + + def rebuild_by_movie(self, RDF: pd.DataFrame) -> pd.DataFrame: + """ + Args: + RDF (pd.DataFrame): ["MovieID","SubjectURI","RelationshipURI","ObjectURI","Abstract"] + + Returns: + pd.DataFrame: ["MovieID","Triple","Abstract"] + """ + # to execute this method you have to have itereted by movie_id + # because as design we want at the end one row for each movie + # MovieID and abstract can be given as input for a more generic method + # first let's combine each row creating column triple as join of rdf + RDF["Triple"] = RDF["SubjectURI"] + RDF["RelationshipURI"] + RDF["ObjectURI"] + # special token + RDF["Triple"] = SpecialToken.START_TRIPLE.value + RDF["Triple"] + SpecialToken.END_TRIPLE.value + # combine rows into one + # MovieID and Abstract are unique for each other 1 <-> 1 + RDF = RDF.groupby(["MovieID", "Abstract"])["Triple"].apply("".join).reset_index() + # add special token for: start of triple, end of triple and start of abstract + RDF["Triple"] = SpecialToken.START_TRIPLE_LIST.value + RDF["Triple"] + RDF["Abstract"] = SpecialToken.ABSTRACT.value + RDF["Abstract"] + return RDF[["MovieID","Triple","Abstract"]] + + + @staticmethod + def build_triple(RDF: pd.DataFrame): + """ + Obtains joined RDF triple in one element, togheter with START and END special token + Args: + RDF (pd.DataFrame): at least ["SubjectURI", "RelationshipURI", "ObjectURI"] + Returns: + pd.DataFrame: RDF["Triple"] (just this column) + """ + # let's combine each row creating column triple as join of rdf + RDF["Triple"] = RDF["SubjectURI"] + RDF["RelationshipURI"] + RDF["ObjectURI"] + # special token + RDF["Triple"] = SpecialToken.START_TRIPLE.value + RDF["Triple"] + SpecialToken.END_TRIPLE.value + return RDF["Triple"] + + + def regex_on_objects(self, RDF: pd.DataFrame) -> pd.DataFrame: + RDF["ObjectURI"] = (RDF["ObjectURI"].astype("string") + .str.replace(r"\r?\n+", ", ", regex=True) # newlines -> ", " + .str.replace(r"\*", "", regex=True)) # delete all asterisks + + return RDF \ No newline at end of file diff --git a/Scripts/DataCleaning/pipeline/movie_filter.py b/Scripts/DataCleaning/pipeline/movie_filter.py new file mode 100644 index 0000000..a65b360 --- /dev/null +++ b/Scripts/DataCleaning/pipeline/movie_filter.py @@ -0,0 +1,49 @@ +import pandas as pd +from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint + +class MovieFilter: + + def __init__(self) -> None: + self.sql_endpoint = SqlEndpoint() + # first obtain all movie_id + movie_query = "SELECT MovieID FROM Movies" + self.MOVIE_FILTER = self.sql_endpoint.get_dataframe_from_query(movie_query) + + + def frequency_filter(self, min_treshold:int, max_treshold:int): + movie_list_placeholder = ",".join(["?"] * len(self.MOVIE_FILTER)) + + filter_query = f""" + SELECT MovieID + FROM RDFs + WHERE MovieID IN ({movie_list_placeholder}) + GROUP BY MovieID + HAVING COUNT(*) BETWEEN {min_treshold} AND {max_treshold}; + """ + self.MOVIE_FILTER = self.sql_endpoint.get_dataframe_from_query(filter_query, tuple(self.MOVIE_FILTER["MovieID"].to_list())) + + + def get_movie_id(self): + return self.MOVIE_FILTER + + + def relation_filter(self, parsed_rel_uri: str, min_treshold:int, max_treshold:int): + movie_ids = self.MOVIE_FILTER["MovieID"].to_list() + movie_list_placeholder = ",".join(["?"] * len(movie_ids)) + + filter_query = f""" + SELECT MovieID + FROM RDFs + JOIN ParsedRelationships ON ParsedRelationships.RelationshipID = RDFs.RelationshipID + WHERE MovieID IN ({movie_list_placeholder}) + GROUP BY MovieID + HAVING SUM(CASE WHEN ParsedRelationships.RelationshipURI = '{parsed_rel_uri}' THEN 1 ELSE 0 END) + BETWEEN {min_treshold} AND {max_treshold}; + """ + + params = tuple(movie_ids) # + (parsed_rel_uri, min_treshold, max_treshold) + self.MOVIE_FILTER = self.sql_endpoint.get_dataframe_from_query(filter_query, params) + + +# movie_filter = MovieFilter() +# movie_filter.frequency_filter(5,10) \ No newline at end of file diff --git a/Scripts/DataCleaning/pipeline/pipeline.py b/Scripts/DataCleaning/pipeline/pipeline.py new file mode 100644 index 0000000..42c3aad --- /dev/null +++ b/Scripts/DataCleaning/pipeline/pipeline.py @@ -0,0 +1,129 @@ +from movie_filter import MovieFilter +from relationship_filter import RelationshipFilter +from rdf_filter import RdfFilter +from cleaner import PipelineApplier + +from Scripts.DataCleaning.data_output_models.bpe_corpus import BPE_corpus +from Scripts.DataCleaning.data_output_models.rdf_text_tasks import RDF_text_task_dataset +from Scripts.DataCleaning.data_output_models.rdf_completation_task import RDF_completation_task_dataset +from Scripts.DataCleaning.data_output_models.debug_csv import Debug_csv + +import pandas as pd + +RELATIONSHIP_FILTER_LIST = [ + "dbp-dbp:wikiPageUsesTemplate","w3:2000/01/rdf-schema#label","dbp-dbo:abstract", + "dbp-dbo:wikiPageID","dbp-dbo:wikiPageRevisionID", "dbp-dbo:wikiPageDisambiguates", + "w3:2002/07/owl#sameAs","dbp-dbp:image","dbp-dbo:wikiPageLength", "w3:2000/01/rdf-schema#comment", + "dbp-dbo:thumbnail", "foaf:depiction", "w3:1999/02/22-rdf-syntax-ns#type", + "dbp-dbp:id","dbp-dbp:totalWidth", "w3:ns/prov#wasDerivedFrom", "dbp-dbp:n", "dbp-dbp:alt", + "dbp-dbo:soundRecording", "dbp-dbp:align", "dbp-dbp:format", "dbp-dbp:n", + "dbp-dbp:filename", "dbp-dbp:wikt", "foaf:isPrimaryTopicOf", "dbp-dbp:quote", "foaf:homepage", + "dbp-dbp:wordnet_type", "dbp-dbp:length" + ] + + +class Pipeline(): + + def __init__(self) -> None: + self._movie_filter = MovieFilter() + self._relationship_filter = RelationshipFilter() + self._rdf_filter = RdfFilter() + self._pipeline = PipelineApplier() + + self.task_bpe_corpus = BPE_corpus("./Assets/Dataset/Tmp/corpus.txt") + self.task_rdf_text = RDF_text_task_dataset("./Assets/Dataset/Tmp/rdf_text.csv") + self.task_rdf_completation = RDF_completation_task_dataset("./Assets/Dataset/Tmp/rdf_completation.csv") + + self._movie_filter.frequency_filter(50,3000) + self._relationship_filter.frequency_filter(50, 2395627) # from 2718 to 3069 + self._relationship_filter.delete_relationship_uri_by_list(RELATIONSHIP_FILTER_LIST) + + def other_filter(self): + self._movie_filter.relation_filter("purl:dc/terms/subject",5,100) + self._movie_filter.relation_filter("dbp-dbo:director",1,100) + + def _get_cleaned_movie_rows(self): + movie_ids = self._movie_filter.get_movie_id() + rel_ids = self._relationship_filter.get_relationship_id() + + for RDF in self._rdf_filter.yield_movie_abbreviated_rdfs(movie_ids,rel_ids): + RDF = self._pipeline.drop_na_from_dataset(RDF) + RDF = self._pipeline.regex_on_objects(RDF) + RDF = self._pipeline.rdf_add_special_token(RDF) + + if RDF.empty: + continue + yield RDF + + + def execute_task_bpe_corpus(self): + for RDF in self._get_cleaned_movie_rows(): + RDF = self._pipeline.rebuild_by_movie(RDF) + RDF = RDF[["Triple","Abstract"]] + self.task_bpe_corpus.write_from_df(RDF) + self._end_file_handler() + + + def execute_tasks_rdf_text(self): + for RDF in self._get_cleaned_movie_rows(): + RDF = self._pipeline.rebuild_by_movie(RDF) + self.task_rdf_text.write(RDF) + self._end_file_handler() + + + def execute_task_rdf_completation(self): + for RDF in self._get_cleaned_movie_rows(): + RDF["Triple"] = self._pipeline.build_triple(RDF) + self.task_rdf_completation.write(RDF[["MovieID","Triple"]]) + self._end_file_handler() + + + def _end_file_handler(self): + self.task_bpe_corpus.close() + self.task_rdf_text.close() + self.task_rdf_completation.close() + + + def execute_all_task(self): + for RDF in self._get_cleaned_movie_rows(): + completation_RDF = RDF.copy() + completation_RDF["Triple"] = self._pipeline.build_triple(completation_RDF) + self.task_rdf_completation.write(completation_RDF[["MovieID","Triple"]]) + + RDF = self._pipeline.rebuild_by_movie(RDF) + + self.task_rdf_text.write(RDF) + self.task_bpe_corpus.write_from_df(RDF[["Triple","Abstract"]]) + + self._end_file_handler() + + + def use_toy_dataset(self): + # CHOOSEN MOVIE: + # The Dark Knight : 117248 + # Inception : 147074 + # The Avengers : 113621 + # Cast Away : 1123 + # The Departed : 117586 + # American Psycho : 90177 + # Avatar : 71587 + # Django Unchained : 138952 + # Spirited Away : 144137 + # Knives Out : 148025 + movie_list = [117248, 147074, 113621, 1123, 117586, 90177, 71587, 138952, 144137, 148025] + self._movie_filter.MOVIE_FILTER = pd.DataFrame({"MovieID": movie_list}) + + def generate_csv_debug_file(self, debug_path:str): + debug_csv = Debug_csv(debug_path) + + for RDF in self._get_cleaned_movie_rows(): + debug_csv.write(RDF) + + debug_csv.close() + + +pipe = Pipeline() +pipe.use_toy_dataset() +pipe.other_filter() +# pipe.execute_all_task() +pipe.generate_csv_debug_file("Assets/Dataset/Tmp/debug.csv") \ No newline at end of file diff --git a/Scripts/DataCleaning/pipeline/rdf_filter.py b/Scripts/DataCleaning/pipeline/rdf_filter.py new file mode 100644 index 0000000..2c0ffd6 --- /dev/null +++ b/Scripts/DataCleaning/pipeline/rdf_filter.py @@ -0,0 +1,28 @@ +import pandas as pd +from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint + +class RdfFilter: + + def __init__(self) -> None: + self.sql_endpoint = SqlEndpoint() + + + + def yield_movie_abbreviated_rdfs(self, MOVIE_ID: pd.DataFrame, REL_ID: pd.DataFrame): + relationship_placeholder = ",".join(["?"] * len(REL_ID)) + + param = tuple(REL_ID["RelationshipID"].to_list()) + + QUERY = f""" + SELECT MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract + FROM RDFs + INNER JOIN ParsedSubjects USING (SubjectID) + INNER JOIN ParsedRelationships USING (RelationshipID) + INNER JOIN ParsedObjects USING (ObjectID) + INNER JOIN WikipediaAbstracts USING (MovieID) + WHERE MovieID = (?) AND RelationshipID IN ({relationship_placeholder}); + """ + + for movie_id in MOVIE_ID["MovieID"].to_list(): + params = (movie_id,) + param + yield self.sql_endpoint.get_dataframe_from_query(QUERY, params=params) \ No newline at end of file diff --git a/Scripts/DataCleaning/pipeline/relationship_filter.py b/Scripts/DataCleaning/pipeline/relationship_filter.py new file mode 100644 index 0000000..205d792 --- /dev/null +++ b/Scripts/DataCleaning/pipeline/relationship_filter.py @@ -0,0 +1,41 @@ +import pandas as pd +from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint + +class RelationshipFilter: + + def __init__(self) -> None: + self.sql_endpoint = SqlEndpoint() + # first obtain all relationship_id + relationship_query = "SELECT RelationshipID FROM Relationships" + self.RELATIONSHIP_FILTER = self.sql_endpoint.get_dataframe_from_query(relationship_query) + + + def frequency_filter(self, min_treshold:int, max_treshold:int): + movie_list_placeholder = ",".join(["?"] * len(self.RELATIONSHIP_FILTER)) + + filter_query = f""" + SELECT RelationshipID + FROM RDFs + WHERE RelationshipID IN ({movie_list_placeholder}) + GROUP BY RelationshipID + HAVING COUNT(*) BETWEEN {min_treshold} AND {max_treshold}; + """ + self.RELATIONSHIP_FILTER = self.sql_endpoint.get_dataframe_from_query(filter_query, tuple(self.RELATIONSHIP_FILTER["RelationshipID"].to_list())) + + + def get_relationship_id(self): + return self.RELATIONSHIP_FILTER + + + def delete_relationship_uri_by_list(self, filter_list: list[str]): + ids_placeholder = ",".join(["?"] * len(self.RELATIONSHIP_FILTER)) + uri_placeholder = ",".join(["?"] * len(filter_list)) + + filter_query = f""" + SELECT RelationshipID + FROM ParsedRelationships + WHERE RelationshipID IN ({ids_placeholder}) + AND RelationshipURI NOT IN ({uri_placeholder}); + """ + params = tuple(self.RELATIONSHIP_FILTER["RelationshipID"].to_list()) + tuple(filter_list) + self.RELATIONSHIP_FILTER = self.sql_endpoint.get_dataframe_from_query(filter_query, params)