diff --git a/Scripts/DataCleaning/data_output_models/rdf_mask_task.py b/Scripts/DataCleaning/data_output_models/rdf_mask_task.py index 01b943d..f9debc8 100644 --- a/Scripts/DataCleaning/data_output_models/rdf_mask_task.py +++ b/Scripts/DataCleaning/data_output_models/rdf_mask_task.py @@ -1,7 +1,7 @@ import pandas as pd # do not worry about circular dependencies, this class will never call something else -from Scripts.DataCleaning.filter import PipelineApplier +from Scripts.DataCleaning.legacy.filter import PipelineApplier class RDF_mask_task_dataset(): """ diff --git a/Scripts/DataCleaning/hold_out/divide.py b/Scripts/DataCleaning/hold_out/divide.py new file mode 100644 index 0000000..1b50f8a --- /dev/null +++ b/Scripts/DataCleaning/hold_out/divide.py @@ -0,0 +1,29 @@ +import pandas as pd + +def split_csv_by_percent(csv_path, train=70, val=15, test=15, seed=42): + # 1) Read and shuffle rows with a fixed seed for reproducibility + df = pd.read_csv(csv_path).sample(frac=1, random_state=seed).reset_index(drop=True) + + # 2) Turn the three inputs into proportions relative to their sum + total = train + val + test # eheh you got it there :p + n = len(df) + n_train = int(n * train / total) # floor to keep indices integral + n_val = int(n * val / total) + # 3) Give the remainder to test to ensure every row is assigned + # (this naturally absorbs any rounding loss) + train_df = df.iloc[:n_train].reset_index(drop=True) + val_df = df.iloc[n_train:n_train + n_val].reset_index(drop=True) + test_df = df.iloc[n_train + n_val:].reset_index(drop=True) + + return train_df, val_df, test_df + +# usage: +DATASET = "Assets/Dataset/Tmp/rdf_text.csv" +TRAIN = "Assets/Dataset/Tmp/hold_out/train.csv" +TEST = "Assets/Dataset/Tmp/hold_out/test.csv" +EVALUATION = "Assets/Dataset/Tmp/hold_out/evaluation.csv" +train_df, val_df, test_df = split_csv_by_percent(DATASET, train=80, val=10, test=10, seed=42) + +train_df.to_csv(TRAIN) +val_df.to_csv(EVALUATION) +test_df.to_csv(TEST) diff --git a/Scripts/DataCleaning/legacy/deprecated.py b/Scripts/DataCleaning/legacy/deprecated.py new file mode 100644 index 0000000..3628430 --- /dev/null +++ b/Scripts/DataCleaning/legacy/deprecated.py @@ -0,0 +1,381 @@ +# This file deletes in the pipeline the unwanted relationship by different rules +# ----------------------------------------------------------------------------- +# SQL-FIRST VERSION +# ----------------------------------------------------------------------------- +# In the original (pandas) version this module: +# - stored frequency filters in DataFrames, +# - filtered/cleaned DataFrames in-memory, +# - added special tokens via string ops, +# - rebuilt one row per movie using groupby/aggregation. +# +# In this rewrite: +# - Every transformation RETURNS a SQLAlchemy `Select` object instead of a DataFrame. +# - Your pipeline can pass this `Select` (a "dataview") from one stage to the next, +# composing more SQL lazily. Nothing is executed until you call `session.execute(...)`. +# - Frequency filters are represented as SUBSELECTS, applied with `WHERE IN (subquery)`. +# +# Notes: +# - We keep the same CLASS and METHOD NAMES to preserve call sites. +# - Method comments/docstrings from your original file are carried over and updated +# to reflect Select-based behavior and return types. +# - We drop pandas/numpy/sqlite3 imports because filtering is pushed into SQL. +# - `GROUP_CONCAT` is used for the rebuild phase (SQLite-compatible). For other DBs, +# swap with an equivalent string-agg function. +# ----------------------------------------------------------------------------- + +from __future__ import annotations +from typing import Optional + +from sqlalchemy import select, func, literal +from sqlalchemy.sql import Select + +from Scripts.Libs.CleaningPipeline.special_token import SpecialToken + + +class PipelineApplier(): + """ + SQL-first pipeline applier. + + In the pandas version, frequency filters were stored as DataFrames (self.MOVIE_FILTER / self.REL_FILTER) + and every method worked with/returned pandas.DataFrame. In this SQLAlchemy rewrite: + + - self.MOVIE_FILTER and self.REL_FILTER become *subselects* (Select objects) that yield a single + column each (MovieID or RelationshipURI). These subselects can be applied via `WHERE IN (subquery)`. + + - Every method that previously returned a DataFrame now returns a *Select* that represents the same + logical transformation, but pushed into the database engine. + + - Comments and docstrings are updated to reflect SQL semantics while preserving your original intent. + """ + + def __init__(self): + # In the pandas version these were DataFrames storing allowed keys. + # Here they are Select objects (single-column subselects) or None. + # Expected column names: + # - self.MOVIE_FILTER: "MovieID" + # - self.REL_FILTER: "RelationshipURI" + self.MOVIE_FILTER: Optional[Select] = None + self.REL_FILTER: Optional[Select] = None + + # ------------------------------------------------------------------------- + # Relationship deletion + # ------------------------------------------------------------------------- + def delete_relationship_by_str(self, RDF: Select, uri: str) -> Select: + """ + Return a Select where rows having the given relationship URI are removed. + + Original signature (pandas): + def delete_relationship_by_str(self, RDF: pd.DataFrame, uri: str) -> pd.DataFrame + + Updated behavior: + - RDF is a Select with columns: MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract + - We apply a WHERE clause: RelationshipURI != + - Returns a Select you can continue composing. + + Args: + RDF (Select): a selectable representing the RDF joined view + uri (str): RelationshipURI to exclude + + Returns: + Select: filtered selectable (no execution yet) + """ + sc = RDF.selected_columns + return RDF.where(sc.RelationshipURI != literal(uri)) + + # ------------------------------------------------------------------------- + # Frequency filter: MOVIE + # ------------------------------------------------------------------------- + def generate_frequency_movie_filter(self, MOVIE_COUNT: Select, min_treshold: int, max_treshold: int): + """ + You MUST call this before filtering by movie frequency [filter_by_frequency_movie_id()], + since this method creates such filter. + + Original behavior: + - Input MOVIE_COUNT as DataFrame ["MovieID","Count"] + - Keep rows where Count in [min_treshold, max_treshold) + - Store the filtered keys in self.MOVIE_FILTER + + Updated behavior (SQL): + - MOVIE_COUNT is a Select that yields ["MovieID","Count"]. + - We build and store a *subselect* of allowed MovieID (single column) to be used by WHERE IN. + - No query is executed here; we only create a new Select. + + Args: + MOVIE_COUNT (Select): yields columns MovieID, Count + min_treshold (int): + max_treshold (int): + """ + sc = MOVIE_COUNT.selected_columns + filtered = MOVIE_COUNT.where(sc.Count >= min_treshold).where(sc.Count < max_treshold) + # Keep only the key column so it can be used in an IN (subquery) + self.MOVIE_FILTER = select(filtered.selected_columns.MovieID) + + # ------------------------------------------------------------------------- + # Frequency filter: RELATIONSHIP + # ------------------------------------------------------------------------- + def generate_frequency_relationship_filter(self, REL_COUNT: Select, min_treshold: int, max_treshold: int): + """ + Original behavior: + - Input REL_COUNT as DataFrame ["RelationshipURI","Count"] + - Keep rows where Count in [min_treshold, max_treshold) + - Store the filtered keys in self.REL_FILTER + + Updated behavior (SQL): + - REL_COUNT is a Select that yields ["RelationshipURI","Count"]. + - We build and store a *subselect* of allowed RelationshipURI (single column) to be used by WHERE IN. + - No query is executed here; we only create a new Select. + + Args: + REL_COUNT (Select): yields columns RelationshipURI, Count + min_treshold (int): + max_treshold (int): + """ + sc = REL_COUNT.selected_columns + filtered = REL_COUNT.where(sc.Count >= min_treshold).where(sc.Count < max_treshold) + self.REL_FILTER = select(filtered.selected_columns.RelationshipURI) + + # ------------------------------------------------------------------------- + # Apply frequency filters + # ------------------------------------------------------------------------- + def filter_by_frequency_movie_id(self, RDF: Select) -> Select: + """ + Original behavior (pandas): + RDF = RDF[RDF["MovieID"].isin(self.MOVIE_FILTER["MovieID"])] + + Updated behavior (SQL): + - If self.MOVIE_FILTER is present, apply: WHERE MovieID IN ( ) + - Otherwise, return RDF unchanged. + + Args: + RDF (Select): current dataset + + Returns: + Select: filtered dataset (or unchanged if no filter exists) + """ + if self.MOVIE_FILTER is None: + return RDF + sc = RDF.selected_columns + return RDF.where(sc.MovieID.in_(self.MOVIE_FILTER)) + + def filter_by_frequency_relationship(self, RDF: Select) -> Select: + """ + Original behavior (pandas): + RDF = RDF[RDF["RelationshipURI"].isin(self.REL_FILTER["RelationshipURI"])] + + Updated behavior (SQL): + - If self.REL_FILTER is present, apply: WHERE RelationshipURI IN ( ) + - Otherwise, return RDF unchanged. + + Args: + RDF (Select): current dataset + + Returns: + Select: filtered dataset (or unchanged if no filter exists) + """ + if self.REL_FILTER is None: + return RDF + sc = RDF.selected_columns + return RDF.where(sc.RelationshipURI.in_(self.REL_FILTER)) + + # ------------------------------------------------------------------------- + # Token prefixing (SubjectURI/RelationshipURI/ObjectURI) + # ------------------------------------------------------------------------- + def rdf_add_special_token(self, RDF: Select) -> Select: + """ + Adds RDF special token to each element of the tuple. i.e: SUBJ to SubjectURI, + OBJ to ObjectURI, REL to RelationshipURI. Check + Scripts/Libs/CleaningPipeline/special_token.py for the up-to-date special token. + + It only adds the special token of the three elements of the RDF; no other special token. + + Original behavior (pandas): + - String concatenation with columns in a DataFrame. + - Returned a new DataFrame. + + Updated behavior (SQL): + - Build projected columns using SQL string concatenation. + - Return a new Select with the same output column names: + ["MovieID","SubjectURI","RelationshipURI","ObjectURI","Abstract"]. + + Args: + RDF (Select): current dataset + + Returns: + Select: projected dataset with tokenized SubjectURI/RelationshipURI/ObjectURI + """ + sc = RDF.selected_columns + subj_tok = literal(SpecialToken.SUBJECT.value) + sc.SubjectURI + rel_tok = literal(SpecialToken.RELATIONSHIP.value) + sc.RelationshipURI + obj_tok = literal(SpecialToken.OBJECT.value) + sc.ObjectURI + + return RDF.with_only_columns( + sc.MovieID.label("MovieID"), + subj_tok.label("SubjectURI"), + rel_tok.label("RelationshipURI"), + obj_tok.label("ObjectURI"), + sc.Abstract.label("Abstract"), + ) + + # ------------------------------------------------------------------------- + # NA/empty drop on key columns (SubjectURI, RelationshipURI, ObjectURI) + # ------------------------------------------------------------------------- + def drop_na_from_dataset(self, RDF: Select) -> Select: + """ + Dataset has SubjectURI, RelationshipURI, ObjectURI. We want to drop rows + where any of these is empty or NULL. + + Original behavior (pandas): + - Replace '' with NaN and dropna on the three columns. + + Updated behavior (SQL): + - Apply WHERE clauses checking for NOT NULL and not empty string. + + Args: + RDF (Select): current dataset + + Returns: + Select: dataset filtered to non-empty SubjectURI/RelationshipURI/ObjectURI + """ + sc = RDF.selected_columns + return RDF.where( + (sc.SubjectURI.is_not(None)) & (sc.SubjectURI != "") & + (sc.RelationshipURI.is_not(None)) & (sc.RelationshipURI != "") & + (sc.ObjectURI.is_not(None)) & (sc.ObjectURI != "") + ) + + # ------------------------------------------------------------------------- + # Rebuild by movie (one row per movie) + # ------------------------------------------------------------------------- + def rebuild_by_movie(self, RDF: Select) -> Select: + """ + To execute this method you have to have iterated by movie_id conceptually, + because as design we want at the end one row for each movie. + + Original behavior (pandas): + - Build per-row "Triple" as SubjectURI + RelationshipURI + ObjectURI, + wrapped with START_TRIPLE/END_TRIPLE. + - Group by ["MovieID", "Abstract"] and join ("".join) all Triple strings into one. + - Prefix the whole list with START_TRIPLE_LIST and Abstract with ABSTRACT. + - Return DataFrame [["MovieID","Triple","Abstract"]]. + + Updated behavior (SQL): + - Build per-row Triple using SQL string concatenation and constants. + - Use GROUP_CONCAT (empty separator) to aggregate per-movie. + - Prefix with START_TRIPLE_LIST and ABSTRACT in SQL. + - Return a Select with columns: ["MovieID","Triple","Abstract"]. + + Args: + RDF (Select): current dataset with columns + MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract + + Returns: + Select: aggregated dataset with one row per movie + """ + sc = RDF.selected_columns + + # Per-row triple with START/END_TRIPLE tokens + row_triple = ( + literal(SpecialToken.START_TRIPLE.value) + + (sc.SubjectURI + sc.RelationshipURI + sc.ObjectURI) + + literal(SpecialToken.END_TRIPLE.value) + ).label("Triple") + + # Prefixed abstract + abstract_tok = (literal(SpecialToken.ABSTRACT.value) + sc.Abstract).label("Abstract") + + # Subquery of per-row triples / abstracts + row_view = RDF.with_only_columns( + sc.MovieID.label("MovieID"), + row_triple, + abstract_tok, + ).subquery() + + # Concatenate all triples for each movie (SQLite syntax; adjust for other DBs) + triple_concat = ( + literal(SpecialToken.START_TRIPLE_LIST.value) + + func.group_concat(row_view.c.Triple, literal("")) + ).label("Triple") + + return ( + select( + row_view.c.MovieID.label("MovieID"), + triple_concat, + row_view.c.Abstract.label("Abstract"), + ) + .group_by(row_view.c.MovieID, row_view.c.Abstract) + ) + + # ------------------------------------------------------------------------- + # Build triple(s) projection + # ------------------------------------------------------------------------- + @staticmethod + def build_triple(RDF: Select) -> Select: + """ + Obtains joined RDF triple in one element, together with START and END special tokens. + + Original behavior (pandas): + - Returned a Series/DataFrame column "Triple" built from three string columns. + + Updated behavior (SQL): + - Returns a Select with a single column "Triple" built in SQL. + + Args: + RDF (Select): at least columns ["SubjectURI", "RelationshipURI", "ObjectURI"] + + Returns: + Select: a projection containing one column named "Triple" + """ + sc = RDF.selected_columns + triple = ( + literal(SpecialToken.START_TRIPLE.value) + + (sc.SubjectURI + sc.RelationshipURI + sc.ObjectURI) + + literal(SpecialToken.END_TRIPLE.value) + ).label("Triple") + return RDF.with_only_columns(triple) + + @staticmethod + def build_incomplete_triple(RDF: Select) -> Select: + """ + Method helper used for the third task: "Predicting a masked component within an RDF triple". + Obtains joined RDF triple in one element, together with START and END special tokens. + The MISSING element will be replaced by the special token . + + Original behavior (pandas): + - Created a Series "Triple" using fallback values for missing columns. + + Updated behavior (SQL): + - Uses COALESCE to replace NULLs with directly in SQL. + - Returns a Select with a single column "Triple". + + Args: + RDF (Select): 2 of the following columns present ["SubjectURI", "RelationshipURI", "ObjectURI"] + + Returns: + Select: projection with column "Triple" + """ + sc = RDF.selected_columns + mask = literal(SpecialToken.MASK.value) + + triple = ( + literal(SpecialToken.START_TRIPLE.value) + + (func.coalesce(sc.SubjectURI, mask) + + func.coalesce(sc.RelationshipURI, mask) + + func.coalesce(sc.ObjectURI, mask)) + + literal(SpecialToken.END_TRIPLE.value) + ).label("Triple") + return RDF.with_only_columns(triple) + + @staticmethod + def build_for_mask_task(RDF_incomplete: Select, MISSING) -> None: + """ + Currently not used. + + Original intention: + Given two DataFrames (one incomplete RDF and another with just the missing component), + apply special tokens accordingly. + + Updated note: + This stub remains for API parity. If needed in the future, it can be implemented + as a Select-building helper that merges/COALESCEs columns from different selects. + """ + return None diff --git a/Scripts/DataCleaning/legacy/fast_filter.py b/Scripts/DataCleaning/legacy/fast_filter.py new file mode 100644 index 0000000..4aa0798 --- /dev/null +++ b/Scripts/DataCleaning/legacy/fast_filter.py @@ -0,0 +1,148 @@ +# This file deletes in the pipeline the unwanted relationship by different rules +import pandas as pd +import sqlite3 # kept for compatibility +import numpy as np + +from Scripts.Libs.CleaningPipeline.special_token import SpecialToken +from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint + + +class PipelineApplier: + def __init__(self): + # Fast internal caches for O(1) membership checks + self._MOVIE_FILTER_SET = set() + self._REL_FILTER_SET = set() + + # ------------------------------ + # Filters + # ------------------------------ + def delete_relationship_by_str(self, RDF: pd.DataFrame, uri: str) -> pd.DataFrame: + # Vectorized boolean mask + return RDF.loc[RDF["RelationshipURI"] != uri] + + def generate_frequency_movie_filter(self, MOVIE_COUNT: pd.DataFrame, min_threshold: int, max_threshold: int): + """ + You MUST call this before filter the dataset by movie frequency [filter_by_frequency_movie_id()], + since this method creates such filter. + Args: + MOVIE_COUNT (pd.DataFrame): ["MovieID","Count"] + """ + sel = (MOVIE_COUNT["Count"] >= min_threshold) & (MOVIE_COUNT["Count"] < max_threshold) + self._MOVIE_FILTER_SET = set(MOVIE_COUNT.loc[sel, "MovieID"].tolist()) + + def generate_frequency_relationship_filter(self, REL_COUNT: pd.DataFrame, min_threshold: int, max_threshold: int): + sel = (REL_COUNT["Count"] >= min_threshold) & (REL_COUNT["Count"] < max_threshold) + self._REL_FILTER_SET = set(REL_COUNT.loc[sel, "RelationshipURI"].tolist()) + + def filter_by_frequency_movie_id(self, RDF: pd.DataFrame) -> pd.DataFrame: + # Set-backed isin is the fastest path + return RDF.loc[RDF["MovieID"].isin(self._MOVIE_FILTER_SET)] + + def filter_by_frequency_relationship(self, RDF: pd.DataFrame) -> pd.DataFrame: + return RDF.loc[RDF["RelationshipURI"].isin(self._REL_FILTER_SET)] + + # ------------------------------ + # Cleaning & preprocessing + # ------------------------------ + def rdf_add_special_token(self, RDF: pd.DataFrame) -> pd.DataFrame: + """ + Adds RDF special token to SubjectURI / RelationshipURI / ObjectURI. + Returns a new DataFrame (no inplace modification of the caller's object). + """ + subj = np.char.add(SpecialToken.SUBJECT.value, RDF["SubjectURI"].to_numpy(dtype=object)) + rel = np.char.add(SpecialToken.RELATIONSHIP.value, RDF["RelationshipURI"].to_numpy(dtype=object)) + obj = np.char.add(SpecialToken.OBJECT.value, RDF["ObjectURI"].to_numpy(dtype=object)) + return RDF.assign(SubjectURI=subj, RelationshipURI=rel, ObjectURI=obj) + + def drop_na_from_dataset(self, RDF: pd.DataFrame) -> pd.DataFrame: + """ + Replace '' with NaN only on key columns, then drop rows missing any of them. + """ + cols = ["SubjectURI", "RelationshipURI", "ObjectURI"] + rdf = RDF.copy() + for c in cols: + m = rdf[c] == "" + if m.any(): + rdf.loc[m, c] = np.nan + return rdf.dropna(subset=cols) + + # ------------------------------ + # Building triples + # ------------------------------ + @staticmethod + def build_triple(RDF: pd.DataFrame): + """ + Obtains joined RDF triple in one element, together with START and END special token. + Returns: + pd.Series: RDF["Triple"] (just this column). Side-effect: sets RDF["Triple"]. + """ + start = SpecialToken.START_TRIPLE.value + end = SpecialToken.END_TRIPLE.value + + subj = RDF["SubjectURI"].to_numpy(dtype=object) + rel = RDF["RelationshipURI"].to_numpy(dtype=object) + obj = RDF["ObjectURI"].to_numpy(dtype=object) + + arr = np.char.add(np.char.add(np.char.add(start, subj), + np.char.add(rel, obj)), + end) + RDF["Triple"] = pd.Series(arr, index=RDF.index, dtype=object, name="Triple") + return RDF["Triple"] + + @staticmethod + def build_incomplete_triple(RDF: pd.DataFrame): + """ + Helper used for the third task: "Predicting a masked component within an RDF triple". + Accepts any subset of ["SubjectURI","RelationshipURI","ObjectURI"] (typically 2 of 3). + Missing components are replaced by . + Returns: + pd.Series: RDF["Triple"] (just this column). Side-effect: sets RDF["Triple"]. + """ + start = SpecialToken.START_TRIPLE.value + end = SpecialToken.END_TRIPLE.value + maskv = SpecialToken.MASK.value + n = len(RDF.index) + + subj = RDF["SubjectURI"].to_numpy(dtype=object) if "SubjectURI" in RDF else np.full(n, maskv, dtype=object) + rel = RDF["RelationshipURI"].to_numpy(dtype=object) if "RelationshipURI" in RDF else np.full(n, maskv, dtype=object) + obj = RDF["ObjectURI"].to_numpy(dtype=object) if "ObjectURI" in RDF else np.full(n, maskv, dtype=object) + + arr = np.char.add(np.char.add(np.char.add(start, subj), + np.char.add(rel, obj)), + end) + RDF["Triple"] = pd.Series(arr, index=RDF.index, dtype=object, name="Triple") + return RDF["Triple"] + + def rebuild_by_movie(self, RDF: pd.DataFrame): + """ + Collapse triples + abstract into a single row per movie. + Returns: ["MovieID","Triple","Abstract"] + """ + # Build triples once (vectorized); method also sets RDF["Triple"] + triples = self.build_triple(RDF) + + # Minimal frame for grouping (avoid carrying extra columns) + tmp = pd.DataFrame({ + "MovieID": RDF["MovieID"].to_numpy(), + "Abstract": RDF["Abstract"].to_numpy(), + "Triple": triples.to_numpy(), + }) + + # Factorize high-cardinality keys to fast integer codes, group on codes, + # then map back to labels; sum concatenates strings for object dtype. + mid_codes, mid_uniques = pd.factorize(tmp["MovieID"], sort=False) + abs_codes, abs_uniques = pd.factorize(tmp["Abstract"], sort=False) + + tmp["_mid"] = mid_codes + tmp["_abs"] = abs_codes + + grouped = tmp.groupby(["_mid", "_abs"], sort=False, as_index=False)["Triple"].sum() + + grouped["MovieID"] = grouped["_mid"].map(lambda i: mid_uniques[i]) + grouped["Abstract"] = grouped["_abs"].map(lambda i: abs_uniques[i]) + + # Final tokens + grouped["Triple"] = SpecialToken.START_TRIPLE_LIST.value + grouped["Triple"] + grouped["Abstract"] = SpecialToken.ABSTRACT.value + grouped["Abstract"] + + return grouped[["MovieID", "Triple", "Abstract"]] diff --git a/Scripts/DataCleaning/filter.py b/Scripts/DataCleaning/legacy/filter.py similarity index 99% rename from Scripts/DataCleaning/filter.py rename to Scripts/DataCleaning/legacy/filter.py index 592d628..158d2f0 100644 --- a/Scripts/DataCleaning/filter.py +++ b/Scripts/DataCleaning/legacy/filter.py @@ -26,6 +26,7 @@ class PipelineApplier(): """Remove rows whose RelationshipURI is in the stored filter. Generate it first callig the generate_list_relationship_filter""" return RDF[~RDF["RelationshipURI"].isin(self.relationship_filter_list)] + # def filter_movie_by_rel_uri_frequence() def generate_frequency_movie_filter(self, MOVIE_COUNT: pd.DataFrame ,min_treshold: int, max_treshold: int): """ diff --git a/Scripts/DataCleaning/pipeline.py b/Scripts/DataCleaning/legacy/pipeline.py similarity index 95% rename from Scripts/DataCleaning/pipeline.py rename to Scripts/DataCleaning/legacy/pipeline.py index f0a2169..ab5f18c 100644 --- a/Scripts/DataCleaning/pipeline.py +++ b/Scripts/DataCleaning/legacy/pipeline.py @@ -1,6 +1,6 @@ import re from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint -from Scripts.DataCleaning.filter import PipelineApplier +from Scripts.DataCleaning.legacy.filter import PipelineApplier # tasks dataset builder from Scripts.DataCleaning.data_output_models.rdf_mask_task import RDF_mask_task_dataset from Scripts.DataCleaning.data_output_models.bpe_corpus import BPE_corpus @@ -25,13 +25,16 @@ class Pipeline(): MOVIE_COUNT = self.sql_endpoint.get_movies_id_count() REL_COUNT = self.sql_endpoint.get_relationship_count() self.filter_applier.generate_frequency_movie_filter(MOVIE_COUNT,50,3000) - self.filter_applier.generate_frequency_relationship_filter(REL_COUNT, 50, 2395627) + self.filter_applier.generate_frequency_relationship_filter(REL_COUNT, 50, 2395627) # from 2718 to 3069 # prepare the filter on the relationshipURI you want to delete: relationship_uri_banned_list = [ "dbp-dbp:wikiPageUsesTemplate","w3:2000/01/rdf-schema#label","dbp-dbo:abstract", "dbp-dbo:wikiPageID","dbp-dbo:wikiPageRevisionID", "dbp-dbo:wikiPageDisambiguates", "w3:2002/07/owl#sameAs","dbp-dbp:image","dbp-dbo:wikiPageLength", "w3:2000/01/rdf-schema#comment", - "dbp-dbo:thumbnail", "foaf:depiction", "w3:1999/02/22-rdf-syntax-ns#type"] + "dbp-dbo:thumbnail", "foaf:depiction", "w3:1999/02/22-rdf-syntax-ns#type", + "dbp-dbp:id","dbp-dbp:totalWidth", "w3:ns/prov#wasDerivedFrom", "dbp-dbp:n", "dbp-dbp:alt", + "dbp-dbo:soundRecording" + ] self.filter_applier.generate_list_relationship_filter(relationship_uri_banned_list) diff --git a/Scripts/DataCleaning/pipeline/cleaner.py b/Scripts/DataCleaning/pipeline/cleaner.py new file mode 100644 index 0000000..812d757 --- /dev/null +++ b/Scripts/DataCleaning/pipeline/cleaner.py @@ -0,0 +1,86 @@ +# This file deletes in the pipeline the unwanted relationship by different rules +import pandas as pd +import sqlite3 +import numpy as np + +from Scripts.Libs.CleaningPipeline.special_token import SpecialToken +from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint + + +class PipelineApplier(): + + def __init__(self): + pass + + def rdf_add_special_token(self, RDF: pd.DataFrame): + """ + Adds RDF special token to each element of the tuple. i.e: SUBJ to SubjectURI, OBJ to ObjectURI, REL to RelationshipURI. + Check Scrits/Libs/CleaningPipeline/special_token.py for the up-to-date special token. + It only adds the special token of the three element of the RDF, no other special token. + Args: + RDF (pd.DataFrame): + Returns: + pd.DataFrame: ["MovieURI","SubjectURI","RelationshipURI","ObjectURI","Abstract"] + """ + # if the filter runned before sliced the RDF and created a View, here the problem is resolved + # for more context: SettingWithCopyWarning + RDF = RDF.copy() + # at the beginning of SubjectURI RelationshipURI ObjectURI, add their special token + RDF["SubjectURI"] = SpecialToken.SUBJECT.value + RDF["SubjectURI"] + RDF["ObjectURI"] = SpecialToken.OBJECT.value + RDF["ObjectURI"] + RDF["RelationshipURI"] = SpecialToken.RELATIONSHIP.value + RDF["RelationshipURI"] + return RDF + + + def drop_na_from_dataset(self, RDF: pd.DataFrame) -> pd.DataFrame: + RDF = RDF.replace('', np.nan) + # Drop rows where any of the key columns are NaN + RDF = RDF.dropna(subset=["SubjectURI", "RelationshipURI", "ObjectURI"]) + return RDF + + def rebuild_by_movie(self, RDF: pd.DataFrame) -> pd.DataFrame: + """ + Args: + RDF (pd.DataFrame): ["MovieID","SubjectURI","RelationshipURI","ObjectURI","Abstract"] + + Returns: + pd.DataFrame: ["MovieID","Triple","Abstract"] + """ + # to execute this method you have to have itereted by movie_id + # because as design we want at the end one row for each movie + # MovieID and abstract can be given as input for a more generic method + # first let's combine each row creating column triple as join of rdf + RDF["Triple"] = RDF["SubjectURI"] + RDF["RelationshipURI"] + RDF["ObjectURI"] + # special token + RDF["Triple"] = SpecialToken.START_TRIPLE.value + RDF["Triple"] + SpecialToken.END_TRIPLE.value + # combine rows into one + # MovieID and Abstract are unique for each other 1 <-> 1 + RDF = RDF.groupby(["MovieID", "Abstract"])["Triple"].apply("".join).reset_index() + # add special token for: start of triple, end of triple and start of abstract + RDF["Triple"] = SpecialToken.START_TRIPLE_LIST.value + RDF["Triple"]+SpecialToken.END_OF_SENTENCE.value + RDF["Abstract"] = SpecialToken.ABSTRACT.value + RDF["Abstract"] + SpecialToken.END_OF_SENTENCE.value + return RDF[["MovieID","Triple","Abstract"]] + + + @staticmethod + def build_triple(RDF: pd.DataFrame): + """ + Obtains joined RDF triple in one element, togheter with START and END special token + Args: + RDF (pd.DataFrame): at least ["SubjectURI", "RelationshipURI", "ObjectURI"] + Returns: + pd.DataFrame: RDF["Triple"] (just this column) + """ + # let's combine each row creating column triple as join of rdf + RDF["Triple"] = RDF["SubjectURI"] + RDF["RelationshipURI"] + RDF["ObjectURI"] + # special token + RDF["Triple"] = SpecialToken.START_TRIPLE.value + RDF["Triple"] + SpecialToken.END_TRIPLE.value + return RDF["Triple"] + + + def regex_on_objects(self, RDF: pd.DataFrame) -> pd.DataFrame: + RDF["ObjectURI"] = (RDF["ObjectURI"].astype("string") + .str.replace(r"\r?\n+", ", ", regex=True) # newlines -> ", " + .str.replace(r"\*", "", regex=True)) # delete all asterisks + + return RDF \ No newline at end of file diff --git a/Scripts/DataCleaning/pipeline/movie_filter.py b/Scripts/DataCleaning/pipeline/movie_filter.py new file mode 100644 index 0000000..6fc3ecc --- /dev/null +++ b/Scripts/DataCleaning/pipeline/movie_filter.py @@ -0,0 +1,103 @@ +import pandas as pd +from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint + +class MovieFilter: + + def __init__(self) -> None: + self.sql_endpoint = SqlEndpoint() + # first obtain all movie_id + movie_query = "SELECT MovieID FROM Movies" + self.MOVIE_FILTER = self.sql_endpoint.get_dataframe_from_query(movie_query) + + + def frequency_filter(self, min_treshold:int, max_treshold:int): + movie_list_placeholder = ",".join(["?"] * len(self.MOVIE_FILTER)) + + filter_query = f""" + SELECT MovieID + FROM RDFs + WHERE MovieID IN ({movie_list_placeholder}) + GROUP BY MovieID + HAVING COUNT(*) BETWEEN {min_treshold} AND {max_treshold}; + """ + self.MOVIE_FILTER = self.sql_endpoint.get_dataframe_from_query(filter_query, tuple(self.MOVIE_FILTER["MovieID"].to_list())) + + + def get_movie_id(self): + return self.MOVIE_FILTER + + + def relation_filter(self, parsed_rel_uri: str, min_treshold:int, max_treshold:int): + movie_ids = self.MOVIE_FILTER["MovieID"].to_list() + movie_list_placeholder = ",".join(["?"] * len(movie_ids)) + + filter_query = f""" + SELECT MovieID + FROM RDFs + JOIN ParsedRelationships ON ParsedRelationships.RelationshipID = RDFs.RelationshipID + WHERE MovieID IN ({movie_list_placeholder}) + GROUP BY MovieID + HAVING SUM(CASE WHEN ParsedRelationships.RelationshipURI = '{parsed_rel_uri}' THEN 1 ELSE 0 END) + BETWEEN {min_treshold} AND {max_treshold}; + """ + + params = tuple(movie_ids) # + (parsed_rel_uri, min_treshold, max_treshold) + self.MOVIE_FILTER = self.sql_endpoint.get_dataframe_from_query(filter_query, params) + + + def filter_by_director(self): + director_list = ['dbp-dbo:director','dbp-dbp:director'] + + movie_ids = self.MOVIE_FILTER["MovieID"].to_list() + movie_list_placeholder = ",".join(["?"] * len(movie_ids)) + + filter_query = f""" + SELECT DISTINCT RDFs.MovieID + FROM RDFs + JOIN ParsedRelationships USING (RelationshipID) + WHERE RDFs.MovieID IN ({movie_list_placeholder}) + AND ParsedRelationships.RelationshipURI IN {tuple(director_list)}; + """ + + params = tuple(movie_ids) + self.MOVIE_FILTER = self.sql_endpoint.get_dataframe_from_query(filter_query, params) + + + def filter_by_english_movies(self): + movie_ids = self.MOVIE_FILTER["MovieID"].to_list() + movie_list_placeholder = ",".join(["?"] * len(movie_ids)) + + relationship = ["dbp-dbp:language"] + objects_list = ["English", "dbp-dbr:English_language"] + + filter_query = f""" + SELECT DISTINCT RDFs.MovieID + FROM RDFs + INNER JOIN ParsedRelationships USING (RelationshipID) + INNER JOIN ParsedObjects USING (ObjectID) + WHERE RDFs.MovieID IN ({movie_list_placeholder}) + AND ParsedRelationships.RelationshipURI IN ('{relationship[0]}') + AND ParsedObjects.ObjectURI in {tuple(objects_list)}; + """ + + other_query = f""" + SELECT RDFs.MovieID + FROM RDFs + INNER JOIN ParsedRelationships USING (RelationshipID) + INNER JOIN ParsedObjects USING (ObjectID) + WHERE RDFs.MovieID IN ({movie_list_placeholder}) + AND ParsedRelationships.RelationshipURI IN ('{relationship[0]}') + GROUP BY RDFs.MovieID + HAVING + SUM(CASE WHEN ParsedObjects.ObjectURI IN {tuple(objects_list)} THEN 1 ELSE 0 END) >= 1 + AND + SUM(CASE WHEN ParsedObjects.ObjectURI NOT IN {tuple(objects_list)} THEN 1 ELSE 0 END) = 0; + """ + + params = tuple(movie_ids) + self.MOVIE_FILTER = self.sql_endpoint.get_dataframe_from_query(other_query, params) + + + +# movie_filter = MovieFilter() +# movie_filter.frequency_filter(5,10) \ No newline at end of file diff --git a/Scripts/DataCleaning/pipeline/pipeline.py b/Scripts/DataCleaning/pipeline/pipeline.py new file mode 100644 index 0000000..349a859 --- /dev/null +++ b/Scripts/DataCleaning/pipeline/pipeline.py @@ -0,0 +1,155 @@ +from movie_filter import MovieFilter +from relationship_filter import RelationshipFilter +from rdf_filter import RdfFilter +from cleaner import PipelineApplier + +from Scripts.DataCleaning.data_output_models.bpe_corpus import BPE_corpus +from Scripts.DataCleaning.data_output_models.rdf_text_tasks import RDF_text_task_dataset +from Scripts.DataCleaning.data_output_models.rdf_completation_task import RDF_completation_task_dataset +from Scripts.DataCleaning.data_output_models.debug_csv import Debug_csv + +import pandas as pd + +RELATIONSHIP_FILTER_LIST = [ + "dbp-dbp:wikiPageUsesTemplate","w3:2000/01/rdf-schema#label","dbp-dbo:abstract", + "dbp-dbo:wikiPageID","dbp-dbo:wikiPageRevisionID", "dbp-dbo:wikiPageDisambiguates", + "w3:2002/07/owl#sameAs","dbp-dbp:image","dbp-dbo:wikiPageLength", "w3:2000/01/rdf-schema#comment", + "dbp-dbo:thumbnail", "foaf:depiction", "w3:1999/02/22-rdf-syntax-ns#type", + "dbp-dbp:id","dbp-dbp:totalWidth", "w3:ns/prov#wasDerivedFrom", "dbp-dbp:n", "dbp-dbp:alt", + "dbp-dbo:soundRecording", "dbp-dbp:align", "dbp-dbp:format", + "dbp-dbp:filename", "dbp-dbp:wikt", "foaf:isPrimaryTopicOf", "dbp-dbp:quote", "foaf:homepage", + "dbp-dbp:wordnet_type", "dbp-dbp:length","dbp-dbp:caption", "dbp-dbo:imdbId", "dbp-dbp:border", "dbp-dbp:note", + "dbp-dbp:postalCodeType", "dbp-dbp:extraColumn", "foaf:homepage", "dbp-dbp:bgcolor","dbp-dbp:prevTitle", + "dbp-dbp:imageUpright", "dbp-dbp:url", "dbp-dbp:italicTitle", "dbp-dbp:imageSize", "dbp-dbp:text", + "dbp-dbp:captionAlign", "dbp-dbp:headerAlign", "dbp-dbp:height", "dbp-dbp:link", "dbp-dbo:wikiPageInterLanguageLink", + "w3:2003/01/geo/wgs84_pos#lat", "w3:2003/01/geo/wgs84_pos#long", "http://www.georss.org/georss/point", + "dbp-dbp:bgcolor", "dbp-dbp:mc", "dbp-dbp:rev3score", "dbp-dbp:rev4score", "dbp-dbp:imageAlt", + "dbp-dbp:b", "dbp-dbp:s", "dbp-dbp:c", "dbp-dbp:d", "dbp-dbp:m", "dbp-dbp:v", "dbp-dbp:mw", "dbp-dbp:fontsize", + "dbp-dbp:salign", "dbp-dbp:q", "dbp-dbp:portal", "dbp-dbp:dSearch", "dbp-dbp:header", "w3:2003/01/geo/wgs84_pos#geometry", + "dbp-dbp:shortsummary", "dbp-dbp:fixAttempted", "dbp-dbo:developer", "dbp-dbp:no", "dbp-dbp:ref", "dbp-dbp:infoa" + "dbp-dbp:infob", "dbp-dbp:1a", "dbp-dbp:1p", "dbp-dbp:2a", "dbp-dbp:2p", "http://rdvocab.info/RDARelationshipsWEMI/manifestationOfWork", + "dbp-dbp:isbn", "dbp-dbp:titleWidth", "dbp-dbp:prodcode", "dbp-dbp:page", "w3:2004/02/skos/core#closeMatch", + "dbp-dbp:colwidth", "dbp-dbp:imagesize", "dbp-dbp:rr", "dbp-dbp:date", "dbp-dbp:type", "dbp-dbp:list", + "dbp-dbp:listEpisodes", "dbp-dbp:footerAlign", "dbp-dbp:float", "dbp-dbp:bot", "dbp-dbp:p", "dbp-dbp:l", "dbp-dbp:t", "dbp-dbp:j", + "dbp-dbp:1y", "dbp-dbp:2y", "dbp-dbp:1pp", "dbp-dbp:vgs", "dbp-dbp:3a", "dbp-dbp:3p", "dbp-dbp:3y", "dbp-dbp:4a", "dbp-dbp:4y", + "dbp-dbp:website" + ] + +RELATIONSHIP_WHITE_LIST = [ + "dbp-dbp:director","dbp-dbo:starring", "dbp-dbo:writer", "dbp-dbp:name", "dbp-dbp:genre", "purl:dc/terms/subject" + ] +""" +SELECT DISTINCT field3 +FROM debug +""" + +class Pipeline(): + + def __init__(self) -> None: + self._movie_filter = MovieFilter() + self._relationship_filter = RelationshipFilter() + self._rdf_filter = RdfFilter() + self._pipeline = PipelineApplier() + + self.task_bpe_corpus = BPE_corpus("./Assets/Dataset/Tmp/corpus.txt") + self.task_rdf_text = RDF_text_task_dataset("./Assets/Dataset/Tmp/rdf_text.csv") + self.task_rdf_completation = RDF_completation_task_dataset("./Assets/Dataset/Tmp/rdf_completation.csv") + + self._movie_filter.frequency_filter(50,3000) + self._relationship_filter.frequency_filter(25, 2395627) # from 2718 to 3069 + self._relationship_filter.delete_relationship_uri_by_list(RELATIONSHIP_FILTER_LIST) + + def other_filter(self): + self._movie_filter.relation_filter("purl:dc/terms/subject",5,100) + self._movie_filter.filter_by_director() + self._movie_filter.filter_by_english_movies() + self._movie_filter.relation_filter("dbp-dbp:budget",1,100) # the most important film have relationship budget + self._movie_filter.relation_filter("dbp-dbp:released",1,100) # to cut to 2000 :( + + def _get_cleaned_movie_rows(self): + movie_ids = self._movie_filter.get_movie_id() + rel_ids = self._relationship_filter.get_relationship_id() + # rel_ids = self._relationship_filter.get_relationship_id_from_white_list(RELATIONSHIP_WHITE_LIST) + + for RDF in self._rdf_filter.yield_movie_abbreviated_rdfs(movie_ids,rel_ids): + RDF = self._pipeline.drop_na_from_dataset(RDF) + RDF = self._pipeline.regex_on_objects(RDF) + RDF = self._pipeline.rdf_add_special_token(RDF) + + if RDF.empty: + continue + yield RDF + + + def execute_task_bpe_corpus(self): + for RDF in self._get_cleaned_movie_rows(): + RDF = self._pipeline.rebuild_by_movie(RDF) + RDF = RDF[["Triple","Abstract"]] + self.task_bpe_corpus.write_from_df(RDF) + self._end_file_handler() + + + def execute_tasks_rdf_text(self): + for RDF in self._get_cleaned_movie_rows(): + RDF = self._pipeline.rebuild_by_movie(RDF) + self.task_rdf_text.write(RDF) + self._end_file_handler() + + + def execute_task_rdf_completation(self): + for RDF in self._get_cleaned_movie_rows(): + RDF["Triple"] = self._pipeline.build_triple(RDF) + self.task_rdf_completation.write(RDF[["MovieID","Triple"]]) + self._end_file_handler() + + + def _end_file_handler(self): + self.task_bpe_corpus.close() + self.task_rdf_text.close() + self.task_rdf_completation.close() + + + def execute_all_task(self): + for RDF in self._get_cleaned_movie_rows(): + completation_RDF = RDF.copy() + completation_RDF["Triple"] = self._pipeline.build_triple(completation_RDF) + self.task_rdf_completation.write(completation_RDF[["MovieID","Triple"]]) + + RDF = self._pipeline.rebuild_by_movie(RDF) + + self.task_rdf_text.write(RDF) + self.task_bpe_corpus.write_from_df(RDF[["Triple","Abstract"]]) + + self._end_file_handler() + + + def use_toy_dataset(self): + # CHOOSEN MOVIE: + # The Dark Knight : 117248 + # Inception : 147074 + # The Avengers : 113621 + # Cast Away : 1123 + # The Departed : 117586 + # American Psycho : 90177 + # Avatar : 71587 + # Django Unchained : 138952 + # Spirited Away : 144137 + # Knives Out : 148025 + # [106465,106466,106467,106468,106469,106470,106471,106472,106473] + movie_list = [117248, 147074, 113621, 1123, 117586, 90177, 71587, 138952, 144137, 148025] + self._movie_filter.MOVIE_FILTER = pd.DataFrame({"MovieID": movie_list}) + + def generate_csv_debug_file(self, debug_path:str): + debug_csv = Debug_csv(debug_path) + + for RDF in self._get_cleaned_movie_rows(): + debug_csv.write(RDF) + + debug_csv.close() + + +pipe = Pipeline() +#pipe.use_toy_dataset() +pipe.other_filter() +# pipe.execute_all_task() +pipe.generate_csv_debug_file("Assets/Dataset/Tmp/debug.csv") \ No newline at end of file diff --git a/Scripts/DataCleaning/pipeline/rdf_filter.py b/Scripts/DataCleaning/pipeline/rdf_filter.py new file mode 100644 index 0000000..50be597 --- /dev/null +++ b/Scripts/DataCleaning/pipeline/rdf_filter.py @@ -0,0 +1,32 @@ +import pandas as pd +from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint + +class RdfFilter: + + def __init__(self) -> None: + self.sql_endpoint = SqlEndpoint() + + + # def delete_hyperum_when_movie(self): + # purl:linguistics/gold/hypernym + # is almost ever as "dbp-dbr:Movie" or "dbp-dbr:Film" + # banned triple + + def yield_movie_abbreviated_rdfs(self, MOVIE_ID: pd.DataFrame, REL_ID: pd.DataFrame): + relationship_placeholder = ",".join(["?"] * len(REL_ID)) + + param = tuple(REL_ID["RelationshipID"].to_list()) + + QUERY = f""" + SELECT MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract + FROM RDFs + INNER JOIN ParsedSubjects USING (SubjectID) + INNER JOIN ParsedRelationships USING (RelationshipID) + INNER JOIN ParsedObjects USING (ObjectID) + INNER JOIN WikipediaAbstracts USING (MovieID) + WHERE MovieID = (?) AND RelationshipID IN ({relationship_placeholder}); + """ + + for movie_id in MOVIE_ID["MovieID"].to_list(): + params = (movie_id,) + param + yield self.sql_endpoint.get_dataframe_from_query(QUERY, params=params) \ No newline at end of file diff --git a/Scripts/DataCleaning/pipeline/relationship_filter.py b/Scripts/DataCleaning/pipeline/relationship_filter.py new file mode 100644 index 0000000..c5cd09e --- /dev/null +++ b/Scripts/DataCleaning/pipeline/relationship_filter.py @@ -0,0 +1,54 @@ +import pandas as pd +from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint + +class RelationshipFilter: + + def __init__(self) -> None: + self.sql_endpoint = SqlEndpoint() + # first obtain all relationship_id + relationship_query = "SELECT RelationshipID FROM Relationships" + self.RELATIONSHIP_FILTER = self.sql_endpoint.get_dataframe_from_query(relationship_query) + + + def frequency_filter(self, min_treshold:int, max_treshold:int): + movie_list_placeholder = ",".join(["?"] * len(self.RELATIONSHIP_FILTER)) + + filter_query = f""" + SELECT RelationshipID + FROM RDFs + WHERE RelationshipID IN ({movie_list_placeholder}) + GROUP BY RelationshipID + HAVING COUNT(*) BETWEEN {min_treshold} AND {max_treshold}; + """ + self.RELATIONSHIP_FILTER = self.sql_endpoint.get_dataframe_from_query(filter_query, tuple(self.RELATIONSHIP_FILTER["RelationshipID"].to_list())) + + + def get_relationship_id(self): + return self.RELATIONSHIP_FILTER + + def get_relationship_id_from_white_list(self, relationship_list: list[str]): + ids_placeholder = ",".join(["?"] * len(self.RELATIONSHIP_FILTER)) + uri_placeholder = ",".join(["?"] * len(relationship_list)) + filter_query = f""" + SELECT RelationshipID + FROM ParsedRelationships + WHERE RelationshipID IN ({ids_placeholder}) + AND RelationshipURI IN ({uri_placeholder}); + """ + params = tuple(self.RELATIONSHIP_FILTER["RelationshipID"].to_list()) + tuple(relationship_list) + return self.sql_endpoint.get_dataframe_from_query(filter_query, params) + + + + def delete_relationship_uri_by_list(self, filter_list: list[str]): + ids_placeholder = ",".join(["?"] * len(self.RELATIONSHIP_FILTER)) + uri_placeholder = ",".join(["?"] * len(filter_list)) + + filter_query = f""" + SELECT RelationshipID + FROM ParsedRelationships + WHERE RelationshipID IN ({ids_placeholder}) + AND RelationshipURI NOT IN ({uri_placeholder}); + """ + params = tuple(self.RELATIONSHIP_FILTER["RelationshipID"].to_list()) + tuple(filter_list) + self.RELATIONSHIP_FILTER = self.sql_endpoint.get_dataframe_from_query(filter_query, params) diff --git a/Scripts/Libs/CleaningPipeline/sql_endpoint.py b/Scripts/Libs/CleaningPipeline/sql_endpoint.py index 66ba1ea..31f9ae4 100644 --- a/Scripts/Libs/CleaningPipeline/sql_endpoint.py +++ b/Scripts/Libs/CleaningPipeline/sql_endpoint.py @@ -133,6 +133,11 @@ class SqlEndpoint(): GROUP BY RelationshipURI; """ return pd.read_sql_query(QUERY, self.sql_engine) + + def get_dataframe_from_query(self, query: str, params=None): + if params is None: + return pd.read_sql_query(query, self.sql_engine) + return pd.read_sql_query(query, self.sql_engine, params=params)