Added Toy Dataset entry point into the Pipeline class
Before it was forced into the sql_endpoint, now all the pipeline can be managed in the Pipeline class
This commit is contained in:
parent
bd72ad3571
commit
8167c9d435
@ -10,22 +10,22 @@ from Scripts.DataCleaning.data_output_models.rdf_completation_task import RDF_co
|
|||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|
||||||
class Pipeline():
|
class Pipeline():
|
||||||
def __init__(self, output):
|
def __init__(self):
|
||||||
self.sql_endpoint = SqlEndpoint()
|
self.sql_endpoint = SqlEndpoint()
|
||||||
# classes to manage taskes' datasets
|
# classes to manage taskes' datasets
|
||||||
self.task_rdf_mask = RDF_mask_task_dataset("./Assets/Dataset/Tmp/debug.csv")
|
self.task_rdf_mask = RDF_mask_task_dataset("./Assets/Dataset/Tmp/rdf_mask.csv")
|
||||||
self.task_bpe_corpus = BPE_corpus("./Assets/Dataset/Tmp/output.txt")
|
self.task_bpe_corpus = BPE_corpus("./Assets/Dataset/Tmp/corpus.txt")
|
||||||
self.task_rdf_text = RDF_text_task_dataset("./Assets/Dataset/Tmp/rdf_text.csv")
|
self.task_rdf_text = RDF_text_task_dataset("./Assets/Dataset/Tmp/rdf_text.csv")
|
||||||
self.task_rdf_completation = RDF_completation_task_dataset("./Assets/Dataset/Tmp/rdf_completation.csv")
|
self.task_rdf_completation = RDF_completation_task_dataset("./Assets/Dataset/Tmp/rdf_completation.csv")
|
||||||
|
|
||||||
# prepare the filter
|
# prepare the filter
|
||||||
# the filter applier needs to know thefrequence of Movies and Relationship among all the Dataset
|
# the filter applier needs to know the frequence of Movies and Relationship among all the Dataset
|
||||||
self.filter_applier = PipelineApplier()
|
self.filter_applier = PipelineApplier()
|
||||||
MOVIE_COUNT = self.sql_endpoint.get_movies_id_count()
|
MOVIE_COUNT = self.sql_endpoint.get_movies_id_count()
|
||||||
REL_COUNT = self.sql_endpoint.get_relationship_count()
|
REL_COUNT = self.sql_endpoint.get_relationship_count()
|
||||||
self.filter_applier.generate_frequency_movie_filter(MOVIE_COUNT,50,3000)
|
self.filter_applier.generate_frequency_movie_filter(MOVIE_COUNT,50,3000)
|
||||||
self.filter_applier.generate_frequency_relationship_filter(REL_COUNT, 50, 2395627)
|
self.filter_applier.generate_frequency_relationship_filter(REL_COUNT, 50, 2395627)
|
||||||
# prepare the filter ot the relationshipURI you want to delete:
|
# prepare the filter on the relationshipURI you want to delete:
|
||||||
relationship_uri_banned_list = [
|
relationship_uri_banned_list = [
|
||||||
"dbp-dbp:wikiPageUsesTemplate","w3:2000/01/rdf-schema#label","dbp-dbo:abstract",
|
"dbp-dbp:wikiPageUsesTemplate","w3:2000/01/rdf-schema#label","dbp-dbo:abstract",
|
||||||
"dbp-dbo:wikiPageID","dbp-dbo:wikiPageRevisionID", "dbp-dbo:wikiPageDisambiguates",
|
"dbp-dbo:wikiPageID","dbp-dbo:wikiPageRevisionID", "dbp-dbo:wikiPageDisambiguates",
|
||||||
@ -34,25 +34,6 @@ class Pipeline():
|
|||||||
self.filter_applier.generate_list_relationship_filter(relationship_uri_banned_list)
|
self.filter_applier.generate_list_relationship_filter(relationship_uri_banned_list)
|
||||||
|
|
||||||
|
|
||||||
def _end_file_handler(self):
|
|
||||||
self.task_bpe_corpus.close()
|
|
||||||
self.task_rdf_mask.close()
|
|
||||||
self.task_rdf_text.close()
|
|
||||||
self.task_rdf_completation.close()
|
|
||||||
|
|
||||||
def _get_cleaned_movie_rows(self):
|
|
||||||
for RDF in self.sql_endpoint.get_abbreviated_dataset_by_movie_id():
|
|
||||||
RDF = self.filter_applier.drop_na_from_dataset(RDF)
|
|
||||||
RDF = self.filter_applier.filter_by_frequency_movie_id(RDF)
|
|
||||||
RDF = self.filter_applier.filter_by_frequency_relationship(RDF)
|
|
||||||
# other filter
|
|
||||||
#
|
|
||||||
RDF = self.filter_applier.delete_relationship_by_list_filter(RDF)
|
|
||||||
if RDF.empty:
|
|
||||||
continue
|
|
||||||
RDF = self.filter_applier.rdf_add_special_token(RDF) # WARNING, THIS MUST BE DONE AFTER FILTER BY FREQUENCE
|
|
||||||
yield RDF
|
|
||||||
|
|
||||||
def execute_task_bpe_corpus(self):
|
def execute_task_bpe_corpus(self):
|
||||||
for RDF in self._get_cleaned_movie_rows():
|
for RDF in self._get_cleaned_movie_rows():
|
||||||
RDF = self.filter_applier.rebuild_by_movie(RDF)
|
RDF = self.filter_applier.rebuild_by_movie(RDF)
|
||||||
@ -66,12 +47,14 @@ class Pipeline():
|
|||||||
self.task_rdf_mask.write(RDF)
|
self.task_rdf_mask.write(RDF)
|
||||||
self._end_file_handler()
|
self._end_file_handler()
|
||||||
|
|
||||||
|
|
||||||
def execute_tasks_rdf_text(self):
|
def execute_tasks_rdf_text(self):
|
||||||
for RDF in self._get_cleaned_movie_rows():
|
for RDF in self._get_cleaned_movie_rows():
|
||||||
RDF = self.filter_applier.rebuild_by_movie(RDF)
|
RDF = self.filter_applier.rebuild_by_movie(RDF)
|
||||||
self.task_rdf_text.write(RDF)
|
self.task_rdf_text.write(RDF)
|
||||||
self._end_file_handler()
|
self._end_file_handler()
|
||||||
|
|
||||||
|
|
||||||
def execute_task_rdf_completation(self):
|
def execute_task_rdf_completation(self):
|
||||||
for RDF in self._get_cleaned_movie_rows():
|
for RDF in self._get_cleaned_movie_rows():
|
||||||
RDF["Triple"] = self.filter_applier.build_triple(RDF)
|
RDF["Triple"] = self.filter_applier.build_triple(RDF)
|
||||||
@ -92,14 +75,55 @@ class Pipeline():
|
|||||||
self.task_bpe_corpus.write_from_df(RDF[["Triple","Abstract"]])
|
self.task_bpe_corpus.write_from_df(RDF[["Triple","Abstract"]])
|
||||||
|
|
||||||
self._end_file_handler()
|
self._end_file_handler()
|
||||||
|
|
||||||
|
|
||||||
|
def _end_file_handler(self):
|
||||||
|
self.task_bpe_corpus.close()
|
||||||
|
self.task_rdf_mask.close()
|
||||||
|
self.task_rdf_text.close()
|
||||||
|
self.task_rdf_completation.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _get_cleaned_movie_rows(self):
|
||||||
|
for RDF in self.sql_endpoint.get_abbreviated_dataset_by_movie_id():
|
||||||
|
RDF = self.filter_applier.drop_na_from_dataset(RDF)
|
||||||
|
RDF = self.filter_applier.filter_by_frequency_movie_id(RDF)
|
||||||
|
RDF = self.filter_applier.filter_by_frequency_relationship(RDF)
|
||||||
|
# other filter
|
||||||
|
#
|
||||||
|
RDF = self.filter_applier.delete_relationship_by_list_filter(RDF)
|
||||||
|
if RDF.empty:
|
||||||
|
continue
|
||||||
|
RDF = self.filter_applier.rdf_add_special_token(RDF) # WARNING, THIS MUST BE DONE AFTER FILTER BY FREQUENCE
|
||||||
|
yield RDF
|
||||||
|
|
||||||
|
|
||||||
|
def use_toy_dataset(self):
|
||||||
|
# CHOOSEN MOVIE:
|
||||||
|
# The Dark Knight : 117248
|
||||||
|
# Inception : 147074
|
||||||
|
# The Avengers : 113621
|
||||||
|
# Cast Away : 1123
|
||||||
|
# The Departed : 117586
|
||||||
|
# American Psycho : 90177
|
||||||
|
# Avatar : 71587
|
||||||
|
# Django Unchained : 138952
|
||||||
|
# Spirited Away : 144137
|
||||||
|
# Knives Out : 148025
|
||||||
|
movie_list = [117248, 147074, 113621, 1123, 117586, 90177, 71587, 138952, 144137, 148025]
|
||||||
|
self.sql_endpoint.movie_ids = movie_list
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# there are a lot of settings to manage
|
||||||
|
# you only need to change settings:
|
||||||
|
# in the init for file paths, frequency filter limit, banned reletionshipURI
|
||||||
|
# in the use_toy_dataset , to change the toy dataset
|
||||||
|
# in _get_cleaned_movie_rows: to change how the pipeline behave
|
||||||
|
|
||||||
|
pipeline = Pipeline()
|
||||||
|
|
||||||
|
# pipeline.use_toy_dataset()
|
||||||
pipeline = Pipeline("./Assets/Dataset/Tmp/output.txt")
|
|
||||||
# pipeline.execute_task_bpe_corpus()
|
# pipeline.execute_task_bpe_corpus()
|
||||||
# pipeline.execute_task_rdf_mask()
|
# pipeline.execute_task_rdf_mask()
|
||||||
# pipeline.execute_tasks_rdf_text()
|
# pipeline.execute_tasks_rdf_text()
|
||||||
|
|||||||
@ -18,8 +18,8 @@ class SqlEndpoint():
|
|||||||
# self.conn = self.sql_engine.connect().execution_options(stream_results=True)
|
# self.conn = self.sql_engine.connect().execution_options(stream_results=True)
|
||||||
# it seems that sqlite doenst support streamer cursor
|
# it seems that sqlite doenst support streamer cursor
|
||||||
# PRAGMA exeutes better in writing not reading
|
# PRAGMA exeutes better in writing not reading
|
||||||
self.chunk_size_row = chunk_size_row
|
self.chunk_size_row = chunk_size_row # not used now, since each chunk is a movie
|
||||||
pass
|
self.movie_ids = movie_ids = pd.read_sql_query("SELECT MovieID FROM Movies;", self.sql_engine)["MovieID"]
|
||||||
|
|
||||||
def get_RDF(self) -> pd.DataFrame :
|
def get_RDF(self) -> pd.DataFrame :
|
||||||
|
|
||||||
@ -79,7 +79,7 @@ class SqlEndpoint():
|
|||||||
Pandas.DataFrame: [MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract]
|
Pandas.DataFrame: [MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract]
|
||||||
"""
|
"""
|
||||||
# chunk by movieId, abstract is the same and some intersting logic are appliable
|
# chunk by movieId, abstract is the same and some intersting logic are appliable
|
||||||
movie_ids = pd.read_sql_query("SELECT MovieID FROM Movies;", self.sql_engine)["MovieID"]
|
# movie_ids = pd.read_sql_query("SELECT MovieID FROM Movies;", self.sql_engine)["MovieID"]
|
||||||
# CHOOSEN MOVIE:
|
# CHOOSEN MOVIE:
|
||||||
# The Dark Knight : 117248
|
# The Dark Knight : 117248
|
||||||
# Inception : 147074
|
# Inception : 147074
|
||||||
@ -91,8 +91,8 @@ class SqlEndpoint():
|
|||||||
# Django Unchained : 138952
|
# Django Unchained : 138952
|
||||||
# Spirited Away : 144137
|
# Spirited Away : 144137
|
||||||
# Knives Out : 148025
|
# Knives Out : 148025
|
||||||
movie_list = [117248, 147074, 113621, 1123, 117586, 90177, 71587, 138952, 144137, 148025]
|
# movie_list = [117248, 147074, 113621, 1123, 117586, 90177, 71587, 138952, 144137, 148025]
|
||||||
movie_ids = movie_list
|
# movie_ids = movie_list
|
||||||
|
|
||||||
QUERY = """
|
QUERY = """
|
||||||
SELECT MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract
|
SELECT MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract
|
||||||
@ -104,7 +104,7 @@ class SqlEndpoint():
|
|||||||
WHERE MovieID = (?);
|
WHERE MovieID = (?);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
for movie_id in movie_ids:
|
for movie_id in self.movie_ids:
|
||||||
yield pd.read_sql_query(QUERY, self.sql_engine, params=(movie_id,))
|
yield pd.read_sql_query(QUERY, self.sql_engine, params=(movie_id,))
|
||||||
|
|
||||||
def get_movies_id_count(self) -> pd.DataFrame:
|
def get_movies_id_count(self) -> pd.DataFrame:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user