Created legacy folder for old pipeline

this pipeline still works but is slower then the new,
some ot its method can be used later
This commit is contained in:
GassiGiuseppe 2025-10-05 14:54:32 +02:00
parent 69fba7c3e9
commit 2bd24ec278
4 changed files with 536 additions and 3 deletions

View File

@ -0,0 +1,381 @@
# This file deletes in the pipeline the unwanted relationship by different rules
# -----------------------------------------------------------------------------
# SQL-FIRST VERSION
# -----------------------------------------------------------------------------
# In the original (pandas) version this module:
# - stored frequency filters in DataFrames,
# - filtered/cleaned DataFrames in-memory,
# - added special tokens via string ops,
# - rebuilt one row per movie using groupby/aggregation.
#
# In this rewrite:
# - Every transformation RETURNS a SQLAlchemy `Select` object instead of a DataFrame.
# - Your pipeline can pass this `Select` (a "dataview") from one stage to the next,
# composing more SQL lazily. Nothing is executed until you call `session.execute(...)`.
# - Frequency filters are represented as SUBSELECTS, applied with `WHERE IN (subquery)`.
#
# Notes:
# - We keep the same CLASS and METHOD NAMES to preserve call sites.
# - Method comments/docstrings from your original file are carried over and updated
# to reflect Select-based behavior and return types.
# - We drop pandas/numpy/sqlite3 imports because filtering is pushed into SQL.
# - `GROUP_CONCAT` is used for the rebuild phase (SQLite-compatible). For other DBs,
# swap with an equivalent string-agg function.
# -----------------------------------------------------------------------------
from __future__ import annotations
from typing import Optional
from sqlalchemy import select, func, literal
from sqlalchemy.sql import Select
from Scripts.Libs.CleaningPipeline.special_token import SpecialToken
class PipelineApplier():
"""
SQL-first pipeline applier.
In the pandas version, frequency filters were stored as DataFrames (self.MOVIE_FILTER / self.REL_FILTER)
and every method worked with/returned pandas.DataFrame. In this SQLAlchemy rewrite:
- self.MOVIE_FILTER and self.REL_FILTER become *subselects* (Select objects) that yield a single
column each (MovieID or RelationshipURI). These subselects can be applied via `WHERE IN (subquery)`.
- Every method that previously returned a DataFrame now returns a *Select* that represents the same
logical transformation, but pushed into the database engine.
- Comments and docstrings are updated to reflect SQL semantics while preserving your original intent.
"""
def __init__(self):
# In the pandas version these were DataFrames storing allowed keys.
# Here they are Select objects (single-column subselects) or None.
# Expected column names:
# - self.MOVIE_FILTER: "MovieID"
# - self.REL_FILTER: "RelationshipURI"
self.MOVIE_FILTER: Optional[Select] = None
self.REL_FILTER: Optional[Select] = None
# -------------------------------------------------------------------------
# Relationship deletion
# -------------------------------------------------------------------------
def delete_relationship_by_str(self, RDF: Select, uri: str) -> Select:
"""
Return a Select where rows having the given relationship URI are removed.
Original signature (pandas):
def delete_relationship_by_str(self, RDF: pd.DataFrame, uri: str) -> pd.DataFrame
Updated behavior:
- RDF is a Select with columns: MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract
- We apply a WHERE clause: RelationshipURI != <uri>
- Returns a Select you can continue composing.
Args:
RDF (Select): a selectable representing the RDF joined view
uri (str): RelationshipURI to exclude
Returns:
Select: filtered selectable (no execution yet)
"""
sc = RDF.selected_columns
return RDF.where(sc.RelationshipURI != literal(uri))
# -------------------------------------------------------------------------
# Frequency filter: MOVIE
# -------------------------------------------------------------------------
def generate_frequency_movie_filter(self, MOVIE_COUNT: Select, min_treshold: int, max_treshold: int):
"""
You MUST call this before filtering by movie frequency [filter_by_frequency_movie_id()],
since this method creates such filter.
Original behavior:
- Input MOVIE_COUNT as DataFrame ["MovieID","Count"]
- Keep rows where Count in [min_treshold, max_treshold)
- Store the filtered keys in self.MOVIE_FILTER
Updated behavior (SQL):
- MOVIE_COUNT is a Select that yields ["MovieID","Count"].
- We build and store a *subselect* of allowed MovieID (single column) to be used by WHERE IN.
- No query is executed here; we only create a new Select.
Args:
MOVIE_COUNT (Select): yields columns MovieID, Count
min_treshold (int):
max_treshold (int):
"""
sc = MOVIE_COUNT.selected_columns
filtered = MOVIE_COUNT.where(sc.Count >= min_treshold).where(sc.Count < max_treshold)
# Keep only the key column so it can be used in an IN (subquery)
self.MOVIE_FILTER = select(filtered.selected_columns.MovieID)
# -------------------------------------------------------------------------
# Frequency filter: RELATIONSHIP
# -------------------------------------------------------------------------
def generate_frequency_relationship_filter(self, REL_COUNT: Select, min_treshold: int, max_treshold: int):
"""
Original behavior:
- Input REL_COUNT as DataFrame ["RelationshipURI","Count"]
- Keep rows where Count in [min_treshold, max_treshold)
- Store the filtered keys in self.REL_FILTER
Updated behavior (SQL):
- REL_COUNT is a Select that yields ["RelationshipURI","Count"].
- We build and store a *subselect* of allowed RelationshipURI (single column) to be used by WHERE IN.
- No query is executed here; we only create a new Select.
Args:
REL_COUNT (Select): yields columns RelationshipURI, Count
min_treshold (int):
max_treshold (int):
"""
sc = REL_COUNT.selected_columns
filtered = REL_COUNT.where(sc.Count >= min_treshold).where(sc.Count < max_treshold)
self.REL_FILTER = select(filtered.selected_columns.RelationshipURI)
# -------------------------------------------------------------------------
# Apply frequency filters
# -------------------------------------------------------------------------
def filter_by_frequency_movie_id(self, RDF: Select) -> Select:
"""
Original behavior (pandas):
RDF = RDF[RDF["MovieID"].isin(self.MOVIE_FILTER["MovieID"])]
Updated behavior (SQL):
- If self.MOVIE_FILTER is present, apply: WHERE MovieID IN ( <subselect> )
- Otherwise, return RDF unchanged.
Args:
RDF (Select): current dataset
Returns:
Select: filtered dataset (or unchanged if no filter exists)
"""
if self.MOVIE_FILTER is None:
return RDF
sc = RDF.selected_columns
return RDF.where(sc.MovieID.in_(self.MOVIE_FILTER))
def filter_by_frequency_relationship(self, RDF: Select) -> Select:
"""
Original behavior (pandas):
RDF = RDF[RDF["RelationshipURI"].isin(self.REL_FILTER["RelationshipURI"])]
Updated behavior (SQL):
- If self.REL_FILTER is present, apply: WHERE RelationshipURI IN ( <subselect> )
- Otherwise, return RDF unchanged.
Args:
RDF (Select): current dataset
Returns:
Select: filtered dataset (or unchanged if no filter exists)
"""
if self.REL_FILTER is None:
return RDF
sc = RDF.selected_columns
return RDF.where(sc.RelationshipURI.in_(self.REL_FILTER))
# -------------------------------------------------------------------------
# Token prefixing (SubjectURI/RelationshipURI/ObjectURI)
# -------------------------------------------------------------------------
def rdf_add_special_token(self, RDF: Select) -> Select:
"""
Adds RDF special token to each element of the tuple. i.e: SUBJ to SubjectURI,
OBJ to ObjectURI, REL to RelationshipURI. Check
Scripts/Libs/CleaningPipeline/special_token.py for the up-to-date special token.
It only adds the special token of the three elements of the RDF; no other special token.
Original behavior (pandas):
- String concatenation with columns in a DataFrame.
- Returned a new DataFrame.
Updated behavior (SQL):
- Build projected columns using SQL string concatenation.
- Return a new Select with the same output column names:
["MovieID","SubjectURI","RelationshipURI","ObjectURI","Abstract"].
Args:
RDF (Select): current dataset
Returns:
Select: projected dataset with tokenized SubjectURI/RelationshipURI/ObjectURI
"""
sc = RDF.selected_columns
subj_tok = literal(SpecialToken.SUBJECT.value) + sc.SubjectURI
rel_tok = literal(SpecialToken.RELATIONSHIP.value) + sc.RelationshipURI
obj_tok = literal(SpecialToken.OBJECT.value) + sc.ObjectURI
return RDF.with_only_columns(
sc.MovieID.label("MovieID"),
subj_tok.label("SubjectURI"),
rel_tok.label("RelationshipURI"),
obj_tok.label("ObjectURI"),
sc.Abstract.label("Abstract"),
)
# -------------------------------------------------------------------------
# NA/empty drop on key columns (SubjectURI, RelationshipURI, ObjectURI)
# -------------------------------------------------------------------------
def drop_na_from_dataset(self, RDF: Select) -> Select:
"""
Dataset has SubjectURI, RelationshipURI, ObjectURI. We want to drop rows
where any of these is empty or NULL.
Original behavior (pandas):
- Replace '' with NaN and dropna on the three columns.
Updated behavior (SQL):
- Apply WHERE clauses checking for NOT NULL and not empty string.
Args:
RDF (Select): current dataset
Returns:
Select: dataset filtered to non-empty SubjectURI/RelationshipURI/ObjectURI
"""
sc = RDF.selected_columns
return RDF.where(
(sc.SubjectURI.is_not(None)) & (sc.SubjectURI != "") &
(sc.RelationshipURI.is_not(None)) & (sc.RelationshipURI != "") &
(sc.ObjectURI.is_not(None)) & (sc.ObjectURI != "")
)
# -------------------------------------------------------------------------
# Rebuild by movie (one row per movie)
# -------------------------------------------------------------------------
def rebuild_by_movie(self, RDF: Select) -> Select:
"""
To execute this method you have to have iterated by movie_id conceptually,
because as design we want at the end one row for each movie.
Original behavior (pandas):
- Build per-row "Triple" as SubjectURI + RelationshipURI + ObjectURI,
wrapped with START_TRIPLE/END_TRIPLE.
- Group by ["MovieID", "Abstract"] and join ("".join) all Triple strings into one.
- Prefix the whole list with START_TRIPLE_LIST and Abstract with ABSTRACT.
- Return DataFrame [["MovieID","Triple","Abstract"]].
Updated behavior (SQL):
- Build per-row Triple using SQL string concatenation and constants.
- Use GROUP_CONCAT (empty separator) to aggregate per-movie.
- Prefix with START_TRIPLE_LIST and ABSTRACT in SQL.
- Return a Select with columns: ["MovieID","Triple","Abstract"].
Args:
RDF (Select): current dataset with columns
MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract
Returns:
Select: aggregated dataset with one row per movie
"""
sc = RDF.selected_columns
# Per-row triple with START/END_TRIPLE tokens
row_triple = (
literal(SpecialToken.START_TRIPLE.value) +
(sc.SubjectURI + sc.RelationshipURI + sc.ObjectURI) +
literal(SpecialToken.END_TRIPLE.value)
).label("Triple")
# Prefixed abstract
abstract_tok = (literal(SpecialToken.ABSTRACT.value) + sc.Abstract).label("Abstract")
# Subquery of per-row triples / abstracts
row_view = RDF.with_only_columns(
sc.MovieID.label("MovieID"),
row_triple,
abstract_tok,
).subquery()
# Concatenate all triples for each movie (SQLite syntax; adjust for other DBs)
triple_concat = (
literal(SpecialToken.START_TRIPLE_LIST.value) +
func.group_concat(row_view.c.Triple, literal(""))
).label("Triple")
return (
select(
row_view.c.MovieID.label("MovieID"),
triple_concat,
row_view.c.Abstract.label("Abstract"),
)
.group_by(row_view.c.MovieID, row_view.c.Abstract)
)
# -------------------------------------------------------------------------
# Build triple(s) projection
# -------------------------------------------------------------------------
@staticmethod
def build_triple(RDF: Select) -> Select:
"""
Obtains joined RDF triple in one element, together with START and END special tokens.
Original behavior (pandas):
- Returned a Series/DataFrame column "Triple" built from three string columns.
Updated behavior (SQL):
- Returns a Select with a single column "Triple" built in SQL.
Args:
RDF (Select): at least columns ["SubjectURI", "RelationshipURI", "ObjectURI"]
Returns:
Select: a projection containing one column named "Triple"
"""
sc = RDF.selected_columns
triple = (
literal(SpecialToken.START_TRIPLE.value) +
(sc.SubjectURI + sc.RelationshipURI + sc.ObjectURI) +
literal(SpecialToken.END_TRIPLE.value)
).label("Triple")
return RDF.with_only_columns(triple)
@staticmethod
def build_incomplete_triple(RDF: Select) -> Select:
"""
Method helper used for the third task: "Predicting a masked component within an RDF triple".
Obtains joined RDF triple in one element, together with START and END special tokens.
The MISSING element will be replaced by the special token <MASK>.
Original behavior (pandas):
- Created a Series "Triple" using fallback values for missing columns.
Updated behavior (SQL):
- Uses COALESCE to replace NULLs with <MASK> directly in SQL.
- Returns a Select with a single column "Triple".
Args:
RDF (Select): 2 of the following columns present ["SubjectURI", "RelationshipURI", "ObjectURI"]
Returns:
Select: projection with column "Triple"
"""
sc = RDF.selected_columns
mask = literal(SpecialToken.MASK.value)
triple = (
literal(SpecialToken.START_TRIPLE.value) +
(func.coalesce(sc.SubjectURI, mask) +
func.coalesce(sc.RelationshipURI, mask) +
func.coalesce(sc.ObjectURI, mask)) +
literal(SpecialToken.END_TRIPLE.value)
).label("Triple")
return RDF.with_only_columns(triple)
@staticmethod
def build_for_mask_task(RDF_incomplete: Select, MISSING) -> None:
"""
Currently not used.
Original intention:
Given two DataFrames (one incomplete RDF and another with just the missing component),
apply special tokens accordingly.
Updated note:
This stub remains for API parity. If needed in the future, it can be implemented
as a Select-building helper that merges/COALESCEs columns from different selects.
"""
return None

View File

@ -0,0 +1,148 @@
# This file deletes in the pipeline the unwanted relationship by different rules
import pandas as pd
import sqlite3 # kept for compatibility
import numpy as np
from Scripts.Libs.CleaningPipeline.special_token import SpecialToken
from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint
class PipelineApplier:
def __init__(self):
# Fast internal caches for O(1) membership checks
self._MOVIE_FILTER_SET = set()
self._REL_FILTER_SET = set()
# ------------------------------
# Filters
# ------------------------------
def delete_relationship_by_str(self, RDF: pd.DataFrame, uri: str) -> pd.DataFrame:
# Vectorized boolean mask
return RDF.loc[RDF["RelationshipURI"] != uri]
def generate_frequency_movie_filter(self, MOVIE_COUNT: pd.DataFrame, min_threshold: int, max_threshold: int):
"""
You MUST call this before filter the dataset by movie frequency [filter_by_frequency_movie_id()],
since this method creates such filter.
Args:
MOVIE_COUNT (pd.DataFrame): ["MovieID","Count"]
"""
sel = (MOVIE_COUNT["Count"] >= min_threshold) & (MOVIE_COUNT["Count"] < max_threshold)
self._MOVIE_FILTER_SET = set(MOVIE_COUNT.loc[sel, "MovieID"].tolist())
def generate_frequency_relationship_filter(self, REL_COUNT: pd.DataFrame, min_threshold: int, max_threshold: int):
sel = (REL_COUNT["Count"] >= min_threshold) & (REL_COUNT["Count"] < max_threshold)
self._REL_FILTER_SET = set(REL_COUNT.loc[sel, "RelationshipURI"].tolist())
def filter_by_frequency_movie_id(self, RDF: pd.DataFrame) -> pd.DataFrame:
# Set-backed isin is the fastest path
return RDF.loc[RDF["MovieID"].isin(self._MOVIE_FILTER_SET)]
def filter_by_frequency_relationship(self, RDF: pd.DataFrame) -> pd.DataFrame:
return RDF.loc[RDF["RelationshipURI"].isin(self._REL_FILTER_SET)]
# ------------------------------
# Cleaning & preprocessing
# ------------------------------
def rdf_add_special_token(self, RDF: pd.DataFrame) -> pd.DataFrame:
"""
Adds RDF special token to SubjectURI / RelationshipURI / ObjectURI.
Returns a new DataFrame (no inplace modification of the caller's object).
"""
subj = np.char.add(SpecialToken.SUBJECT.value, RDF["SubjectURI"].to_numpy(dtype=object))
rel = np.char.add(SpecialToken.RELATIONSHIP.value, RDF["RelationshipURI"].to_numpy(dtype=object))
obj = np.char.add(SpecialToken.OBJECT.value, RDF["ObjectURI"].to_numpy(dtype=object))
return RDF.assign(SubjectURI=subj, RelationshipURI=rel, ObjectURI=obj)
def drop_na_from_dataset(self, RDF: pd.DataFrame) -> pd.DataFrame:
"""
Replace '' with NaN only on key columns, then drop rows missing any of them.
"""
cols = ["SubjectURI", "RelationshipURI", "ObjectURI"]
rdf = RDF.copy()
for c in cols:
m = rdf[c] == ""
if m.any():
rdf.loc[m, c] = np.nan
return rdf.dropna(subset=cols)
# ------------------------------
# Building triples
# ------------------------------
@staticmethod
def build_triple(RDF: pd.DataFrame):
"""
Obtains joined RDF triple in one element, together with START and END special token.
Returns:
pd.Series: RDF["Triple"] (just this column). Side-effect: sets RDF["Triple"].
"""
start = SpecialToken.START_TRIPLE.value
end = SpecialToken.END_TRIPLE.value
subj = RDF["SubjectURI"].to_numpy(dtype=object)
rel = RDF["RelationshipURI"].to_numpy(dtype=object)
obj = RDF["ObjectURI"].to_numpy(dtype=object)
arr = np.char.add(np.char.add(np.char.add(start, subj),
np.char.add(rel, obj)),
end)
RDF["Triple"] = pd.Series(arr, index=RDF.index, dtype=object, name="Triple")
return RDF["Triple"]
@staticmethod
def build_incomplete_triple(RDF: pd.DataFrame):
"""
Helper used for the third task: "Predicting a masked component within an RDF triple".
Accepts any subset of ["SubjectURI","RelationshipURI","ObjectURI"] (typically 2 of 3).
Missing components are replaced by <MASK>.
Returns:
pd.Series: RDF["Triple"] (just this column). Side-effect: sets RDF["Triple"].
"""
start = SpecialToken.START_TRIPLE.value
end = SpecialToken.END_TRIPLE.value
maskv = SpecialToken.MASK.value
n = len(RDF.index)
subj = RDF["SubjectURI"].to_numpy(dtype=object) if "SubjectURI" in RDF else np.full(n, maskv, dtype=object)
rel = RDF["RelationshipURI"].to_numpy(dtype=object) if "RelationshipURI" in RDF else np.full(n, maskv, dtype=object)
obj = RDF["ObjectURI"].to_numpy(dtype=object) if "ObjectURI" in RDF else np.full(n, maskv, dtype=object)
arr = np.char.add(np.char.add(np.char.add(start, subj),
np.char.add(rel, obj)),
end)
RDF["Triple"] = pd.Series(arr, index=RDF.index, dtype=object, name="Triple")
return RDF["Triple"]
def rebuild_by_movie(self, RDF: pd.DataFrame):
"""
Collapse triples + abstract into a single row per movie.
Returns: ["MovieID","Triple","Abstract"]
"""
# Build triples once (vectorized); method also sets RDF["Triple"]
triples = self.build_triple(RDF)
# Minimal frame for grouping (avoid carrying extra columns)
tmp = pd.DataFrame({
"MovieID": RDF["MovieID"].to_numpy(),
"Abstract": RDF["Abstract"].to_numpy(),
"Triple": triples.to_numpy(),
})
# Factorize high-cardinality keys to fast integer codes, group on codes,
# then map back to labels; sum concatenates strings for object dtype.
mid_codes, mid_uniques = pd.factorize(tmp["MovieID"], sort=False)
abs_codes, abs_uniques = pd.factorize(tmp["Abstract"], sort=False)
tmp["_mid"] = mid_codes
tmp["_abs"] = abs_codes
grouped = tmp.groupby(["_mid", "_abs"], sort=False, as_index=False)["Triple"].sum()
grouped["MovieID"] = grouped["_mid"].map(lambda i: mid_uniques[i])
grouped["Abstract"] = grouped["_abs"].map(lambda i: abs_uniques[i])
# Final tokens
grouped["Triple"] = SpecialToken.START_TRIPLE_LIST.value + grouped["Triple"]
grouped["Abstract"] = SpecialToken.ABSTRACT.value + grouped["Abstract"]
return grouped[["MovieID", "Triple", "Abstract"]]

View File

@ -26,6 +26,7 @@ class PipelineApplier():
"""Remove rows whose RelationshipURI is in the stored filter. Generate it first callig the generate_list_relationship_filter"""
return RDF[~RDF["RelationshipURI"].isin(self.relationship_filter_list)]
# def filter_movie_by_rel_uri_frequence()
def generate_frequency_movie_filter(self, MOVIE_COUNT: pd.DataFrame ,min_treshold: int, max_treshold: int):
"""

View File

@ -1,6 +1,6 @@
import re
from Scripts.Libs.CleaningPipeline.sql_endpoint import SqlEndpoint
from Scripts.DataCleaning.filter import PipelineApplier
from Scripts.DataCleaning.legacy.filter import PipelineApplier
# tasks dataset builder
from Scripts.DataCleaning.data_output_models.rdf_mask_task import RDF_mask_task_dataset
from Scripts.DataCleaning.data_output_models.bpe_corpus import BPE_corpus
@ -25,13 +25,16 @@ class Pipeline():
MOVIE_COUNT = self.sql_endpoint.get_movies_id_count()
REL_COUNT = self.sql_endpoint.get_relationship_count()
self.filter_applier.generate_frequency_movie_filter(MOVIE_COUNT,50,3000)
self.filter_applier.generate_frequency_relationship_filter(REL_COUNT, 50, 2395627)
self.filter_applier.generate_frequency_relationship_filter(REL_COUNT, 50, 2395627) # from 2718 to 3069
# prepare the filter on the relationshipURI you want to delete:
relationship_uri_banned_list = [
"dbp-dbp:wikiPageUsesTemplate","w3:2000/01/rdf-schema#label","dbp-dbo:abstract",
"dbp-dbo:wikiPageID","dbp-dbo:wikiPageRevisionID", "dbp-dbo:wikiPageDisambiguates",
"w3:2002/07/owl#sameAs","dbp-dbp:image","dbp-dbo:wikiPageLength", "w3:2000/01/rdf-schema#comment",
"dbp-dbo:thumbnail", "foaf:depiction", "w3:1999/02/22-rdf-syntax-ns#type"]
"dbp-dbo:thumbnail", "foaf:depiction", "w3:1999/02/22-rdf-syntax-ns#type",
"dbp-dbp:id","dbp-dbp:totalWidth", "w3:ns/prov#wasDerivedFrom", "dbp-dbp:n", "dbp-dbp:alt",
"dbp-dbo:soundRecording"
]
self.filter_applier.generate_list_relationship_filter(relationship_uri_banned_list)