new faster pipeline

This commit is contained in:
GassiGiuseppe 2025-10-05 14:57:45 +02:00
parent 255d801a80
commit acb43fc899
5 changed files with 333 additions and 0 deletions

View File

@ -0,0 +1,86 @@
# This file deletes in the pipeline the unwanted relationship by different rules
import pandas as pd
import sqlite3
import numpy as np
from Scripts.Libs.CleaningPipeline.special_token import SpecialToken
from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint
class PipelineApplier():
def __init__(self):
pass
def rdf_add_special_token(self, RDF: pd.DataFrame):
"""
Adds RDF special token to each element of the tuple. i.e: SUBJ to SubjectURI, OBJ to ObjectURI, REL to RelationshipURI.
Check Scrits/Libs/CleaningPipeline/special_token.py for the up-to-date special token.
It only adds the special token of the three element of the RDF, no other special token.
Args:
RDF (pd.DataFrame):
Returns:
pd.DataFrame: ["MovieURI","SubjectURI","RelationshipURI","ObjectURI","Abstract"]
"""
# if the filter runned before sliced the RDF and created a View, here the problem is resolved
# for more context: SettingWithCopyWarning
RDF = RDF.copy()
# at the beginning of SubjectURI RelationshipURI ObjectURI, add their special token
RDF["SubjectURI"] = SpecialToken.SUBJECT.value + RDF["SubjectURI"]
RDF["ObjectURI"] = SpecialToken.OBJECT.value + RDF["ObjectURI"]
RDF["RelationshipURI"] = SpecialToken.RELATIONSHIP.value + RDF["RelationshipURI"]
return RDF
def drop_na_from_dataset(self, RDF: pd.DataFrame) -> pd.DataFrame:
RDF = RDF.replace('', np.nan)
# Drop rows where any of the key columns are NaN
RDF = RDF.dropna(subset=["SubjectURI", "RelationshipURI", "ObjectURI"])
return RDF
def rebuild_by_movie(self, RDF: pd.DataFrame) -> pd.DataFrame:
"""
Args:
RDF (pd.DataFrame): ["MovieID","SubjectURI","RelationshipURI","ObjectURI","Abstract"]
Returns:
pd.DataFrame: ["MovieID","Triple","Abstract"]
"""
# to execute this method you have to have itereted by movie_id
# because as design we want at the end one row for each movie
# MovieID and abstract can be given as input for a more generic method
# first let's combine each row creating column triple as join of rdf
RDF["Triple"] = RDF["SubjectURI"] + RDF["RelationshipURI"] + RDF["ObjectURI"]
# special token
RDF["Triple"] = SpecialToken.START_TRIPLE.value + RDF["Triple"] + SpecialToken.END_TRIPLE.value
# combine rows into one
# MovieID and Abstract are unique for each other 1 <-> 1
RDF = RDF.groupby(["MovieID", "Abstract"])["Triple"].apply("".join).reset_index()
# add special token for: start of triple, end of triple and start of abstract
RDF["Triple"] = SpecialToken.START_TRIPLE_LIST.value + RDF["Triple"]
RDF["Abstract"] = SpecialToken.ABSTRACT.value + RDF["Abstract"]
return RDF[["MovieID","Triple","Abstract"]]
@staticmethod
def build_triple(RDF: pd.DataFrame):
"""
Obtains joined RDF triple in one element, togheter with START and END special token
Args:
RDF (pd.DataFrame): at least ["SubjectURI", "RelationshipURI", "ObjectURI"]
Returns:
pd.DataFrame: RDF["Triple"] (just this column)
"""
# let's combine each row creating column triple as join of rdf
RDF["Triple"] = RDF["SubjectURI"] + RDF["RelationshipURI"] + RDF["ObjectURI"]
# special token
RDF["Triple"] = SpecialToken.START_TRIPLE.value + RDF["Triple"] + SpecialToken.END_TRIPLE.value
return RDF["Triple"]
def regex_on_objects(self, RDF: pd.DataFrame) -> pd.DataFrame:
RDF["ObjectURI"] = (RDF["ObjectURI"].astype("string")
.str.replace(r"\r?\n+", ", ", regex=True) # newlines -> ", "
.str.replace(r"\*", "", regex=True)) # delete all asterisks
return RDF

View File

@ -0,0 +1,49 @@
import pandas as pd
from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint
class MovieFilter:
def __init__(self) -> None:
self.sql_endpoint = SqlEndpoint()
# first obtain all movie_id
movie_query = "SELECT MovieID FROM Movies"
self.MOVIE_FILTER = self.sql_endpoint.get_dataframe_from_query(movie_query)
def frequency_filter(self, min_treshold:int, max_treshold:int):
movie_list_placeholder = ",".join(["?"] * len(self.MOVIE_FILTER))
filter_query = f"""
SELECT MovieID
FROM RDFs
WHERE MovieID IN ({movie_list_placeholder})
GROUP BY MovieID
HAVING COUNT(*) BETWEEN {min_treshold} AND {max_treshold};
"""
self.MOVIE_FILTER = self.sql_endpoint.get_dataframe_from_query(filter_query, tuple(self.MOVIE_FILTER["MovieID"].to_list()))
def get_movie_id(self):
return self.MOVIE_FILTER
def relation_filter(self, parsed_rel_uri: str, min_treshold:int, max_treshold:int):
movie_ids = self.MOVIE_FILTER["MovieID"].to_list()
movie_list_placeholder = ",".join(["?"] * len(movie_ids))
filter_query = f"""
SELECT MovieID
FROM RDFs
JOIN ParsedRelationships ON ParsedRelationships.RelationshipID = RDFs.RelationshipID
WHERE MovieID IN ({movie_list_placeholder})
GROUP BY MovieID
HAVING SUM(CASE WHEN ParsedRelationships.RelationshipURI = '{parsed_rel_uri}' THEN 1 ELSE 0 END)
BETWEEN {min_treshold} AND {max_treshold};
"""
params = tuple(movie_ids) # + (parsed_rel_uri, min_treshold, max_treshold)
self.MOVIE_FILTER = self.sql_endpoint.get_dataframe_from_query(filter_query, params)
# movie_filter = MovieFilter()
# movie_filter.frequency_filter(5,10)

View File

@ -0,0 +1,129 @@
from movie_filter import MovieFilter
from relationship_filter import RelationshipFilter
from rdf_filter import RdfFilter
from cleaner import PipelineApplier
from Scripts.DataCleaning.data_output_models.bpe_corpus import BPE_corpus
from Scripts.DataCleaning.data_output_models.rdf_text_tasks import RDF_text_task_dataset
from Scripts.DataCleaning.data_output_models.rdf_completation_task import RDF_completation_task_dataset
from Scripts.DataCleaning.data_output_models.debug_csv import Debug_csv
import pandas as pd
RELATIONSHIP_FILTER_LIST = [
"dbp-dbp:wikiPageUsesTemplate","w3:2000/01/rdf-schema#label","dbp-dbo:abstract",
"dbp-dbo:wikiPageID","dbp-dbo:wikiPageRevisionID", "dbp-dbo:wikiPageDisambiguates",
"w3:2002/07/owl#sameAs","dbp-dbp:image","dbp-dbo:wikiPageLength", "w3:2000/01/rdf-schema#comment",
"dbp-dbo:thumbnail", "foaf:depiction", "w3:1999/02/22-rdf-syntax-ns#type",
"dbp-dbp:id","dbp-dbp:totalWidth", "w3:ns/prov#wasDerivedFrom", "dbp-dbp:n", "dbp-dbp:alt",
"dbp-dbo:soundRecording", "dbp-dbp:align", "dbp-dbp:format", "dbp-dbp:n",
"dbp-dbp:filename", "dbp-dbp:wikt", "foaf:isPrimaryTopicOf", "dbp-dbp:quote", "foaf:homepage",
"dbp-dbp:wordnet_type", "dbp-dbp:length"
]
class Pipeline():
def __init__(self) -> None:
self._movie_filter = MovieFilter()
self._relationship_filter = RelationshipFilter()
self._rdf_filter = RdfFilter()
self._pipeline = PipelineApplier()
self.task_bpe_corpus = BPE_corpus("./Assets/Dataset/Tmp/corpus.txt")
self.task_rdf_text = RDF_text_task_dataset("./Assets/Dataset/Tmp/rdf_text.csv")
self.task_rdf_completation = RDF_completation_task_dataset("./Assets/Dataset/Tmp/rdf_completation.csv")
self._movie_filter.frequency_filter(50,3000)
self._relationship_filter.frequency_filter(50, 2395627) # from 2718 to 3069
self._relationship_filter.delete_relationship_uri_by_list(RELATIONSHIP_FILTER_LIST)
def other_filter(self):
self._movie_filter.relation_filter("purl:dc/terms/subject",5,100)
self._movie_filter.relation_filter("dbp-dbo:director",1,100)
def _get_cleaned_movie_rows(self):
movie_ids = self._movie_filter.get_movie_id()
rel_ids = self._relationship_filter.get_relationship_id()
for RDF in self._rdf_filter.yield_movie_abbreviated_rdfs(movie_ids,rel_ids):
RDF = self._pipeline.drop_na_from_dataset(RDF)
RDF = self._pipeline.regex_on_objects(RDF)
RDF = self._pipeline.rdf_add_special_token(RDF)
if RDF.empty:
continue
yield RDF
def execute_task_bpe_corpus(self):
for RDF in self._get_cleaned_movie_rows():
RDF = self._pipeline.rebuild_by_movie(RDF)
RDF = RDF[["Triple","Abstract"]]
self.task_bpe_corpus.write_from_df(RDF)
self._end_file_handler()
def execute_tasks_rdf_text(self):
for RDF in self._get_cleaned_movie_rows():
RDF = self._pipeline.rebuild_by_movie(RDF)
self.task_rdf_text.write(RDF)
self._end_file_handler()
def execute_task_rdf_completation(self):
for RDF in self._get_cleaned_movie_rows():
RDF["Triple"] = self._pipeline.build_triple(RDF)
self.task_rdf_completation.write(RDF[["MovieID","Triple"]])
self._end_file_handler()
def _end_file_handler(self):
self.task_bpe_corpus.close()
self.task_rdf_text.close()
self.task_rdf_completation.close()
def execute_all_task(self):
for RDF in self._get_cleaned_movie_rows():
completation_RDF = RDF.copy()
completation_RDF["Triple"] = self._pipeline.build_triple(completation_RDF)
self.task_rdf_completation.write(completation_RDF[["MovieID","Triple"]])
RDF = self._pipeline.rebuild_by_movie(RDF)
self.task_rdf_text.write(RDF)
self.task_bpe_corpus.write_from_df(RDF[["Triple","Abstract"]])
self._end_file_handler()
def use_toy_dataset(self):
# CHOOSEN MOVIE:
# The Dark Knight : 117248
# Inception : 147074
# The Avengers : 113621
# Cast Away : 1123
# The Departed : 117586
# American Psycho : 90177
# Avatar : 71587
# Django Unchained : 138952
# Spirited Away : 144137
# Knives Out : 148025
movie_list = [117248, 147074, 113621, 1123, 117586, 90177, 71587, 138952, 144137, 148025]
self._movie_filter.MOVIE_FILTER = pd.DataFrame({"MovieID": movie_list})
def generate_csv_debug_file(self, debug_path:str):
debug_csv = Debug_csv(debug_path)
for RDF in self._get_cleaned_movie_rows():
debug_csv.write(RDF)
debug_csv.close()
pipe = Pipeline()
pipe.use_toy_dataset()
pipe.other_filter()
# pipe.execute_all_task()
pipe.generate_csv_debug_file("Assets/Dataset/Tmp/debug.csv")

View File

@ -0,0 +1,28 @@
import pandas as pd
from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint
class RdfFilter:
def __init__(self) -> None:
self.sql_endpoint = SqlEndpoint()
def yield_movie_abbreviated_rdfs(self, MOVIE_ID: pd.DataFrame, REL_ID: pd.DataFrame):
relationship_placeholder = ",".join(["?"] * len(REL_ID))
param = tuple(REL_ID["RelationshipID"].to_list())
QUERY = f"""
SELECT MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract
FROM RDFs
INNER JOIN ParsedSubjects USING (SubjectID)
INNER JOIN ParsedRelationships USING (RelationshipID)
INNER JOIN ParsedObjects USING (ObjectID)
INNER JOIN WikipediaAbstracts USING (MovieID)
WHERE MovieID = (?) AND RelationshipID IN ({relationship_placeholder});
"""
for movie_id in MOVIE_ID["MovieID"].to_list():
params = (movie_id,) + param
yield self.sql_endpoint.get_dataframe_from_query(QUERY, params=params)

View File

@ -0,0 +1,41 @@
import pandas as pd
from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint
class RelationshipFilter:
def __init__(self) -> None:
self.sql_endpoint = SqlEndpoint()
# first obtain all relationship_id
relationship_query = "SELECT RelationshipID FROM Relationships"
self.RELATIONSHIP_FILTER = self.sql_endpoint.get_dataframe_from_query(relationship_query)
def frequency_filter(self, min_treshold:int, max_treshold:int):
movie_list_placeholder = ",".join(["?"] * len(self.RELATIONSHIP_FILTER))
filter_query = f"""
SELECT RelationshipID
FROM RDFs
WHERE RelationshipID IN ({movie_list_placeholder})
GROUP BY RelationshipID
HAVING COUNT(*) BETWEEN {min_treshold} AND {max_treshold};
"""
self.RELATIONSHIP_FILTER = self.sql_endpoint.get_dataframe_from_query(filter_query, tuple(self.RELATIONSHIP_FILTER["RelationshipID"].to_list()))
def get_relationship_id(self):
return self.RELATIONSHIP_FILTER
def delete_relationship_uri_by_list(self, filter_list: list[str]):
ids_placeholder = ",".join(["?"] * len(self.RELATIONSHIP_FILTER))
uri_placeholder = ",".join(["?"] * len(filter_list))
filter_query = f"""
SELECT RelationshipID
FROM ParsedRelationships
WHERE RelationshipID IN ({ids_placeholder})
AND RelationshipURI NOT IN ({uri_placeholder});
"""
params = tuple(self.RELATIONSHIP_FILTER["RelationshipID"].to_list()) + tuple(filter_list)
self.RELATIONSHIP_FILTER = self.sql_endpoint.get_dataframe_from_query(filter_query, params)