this pipeline still works but is slower then the new, some ot its method can be used later
382 lines
16 KiB
Python
382 lines
16 KiB
Python
# This file deletes in the pipeline the unwanted relationship by different rules
|
|
# -----------------------------------------------------------------------------
|
|
# SQL-FIRST VERSION
|
|
# -----------------------------------------------------------------------------
|
|
# In the original (pandas) version this module:
|
|
# - stored frequency filters in DataFrames,
|
|
# - filtered/cleaned DataFrames in-memory,
|
|
# - added special tokens via string ops,
|
|
# - rebuilt one row per movie using groupby/aggregation.
|
|
#
|
|
# In this rewrite:
|
|
# - Every transformation RETURNS a SQLAlchemy `Select` object instead of a DataFrame.
|
|
# - Your pipeline can pass this `Select` (a "dataview") from one stage to the next,
|
|
# composing more SQL lazily. Nothing is executed until you call `session.execute(...)`.
|
|
# - Frequency filters are represented as SUBSELECTS, applied with `WHERE IN (subquery)`.
|
|
#
|
|
# Notes:
|
|
# - We keep the same CLASS and METHOD NAMES to preserve call sites.
|
|
# - Method comments/docstrings from your original file are carried over and updated
|
|
# to reflect Select-based behavior and return types.
|
|
# - We drop pandas/numpy/sqlite3 imports because filtering is pushed into SQL.
|
|
# - `GROUP_CONCAT` is used for the rebuild phase (SQLite-compatible). For other DBs,
|
|
# swap with an equivalent string-agg function.
|
|
# -----------------------------------------------------------------------------
|
|
|
|
from __future__ import annotations
|
|
from typing import Optional
|
|
|
|
from sqlalchemy import select, func, literal
|
|
from sqlalchemy.sql import Select
|
|
|
|
from Scripts.Libs.CleaningPipeline.special_token import SpecialToken
|
|
|
|
|
|
class PipelineApplier():
|
|
"""
|
|
SQL-first pipeline applier.
|
|
|
|
In the pandas version, frequency filters were stored as DataFrames (self.MOVIE_FILTER / self.REL_FILTER)
|
|
and every method worked with/returned pandas.DataFrame. In this SQLAlchemy rewrite:
|
|
|
|
- self.MOVIE_FILTER and self.REL_FILTER become *subselects* (Select objects) that yield a single
|
|
column each (MovieID or RelationshipURI). These subselects can be applied via `WHERE IN (subquery)`.
|
|
|
|
- Every method that previously returned a DataFrame now returns a *Select* that represents the same
|
|
logical transformation, but pushed into the database engine.
|
|
|
|
- Comments and docstrings are updated to reflect SQL semantics while preserving your original intent.
|
|
"""
|
|
|
|
def __init__(self):
|
|
# In the pandas version these were DataFrames storing allowed keys.
|
|
# Here they are Select objects (single-column subselects) or None.
|
|
# Expected column names:
|
|
# - self.MOVIE_FILTER: "MovieID"
|
|
# - self.REL_FILTER: "RelationshipURI"
|
|
self.MOVIE_FILTER: Optional[Select] = None
|
|
self.REL_FILTER: Optional[Select] = None
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Relationship deletion
|
|
# -------------------------------------------------------------------------
|
|
def delete_relationship_by_str(self, RDF: Select, uri: str) -> Select:
|
|
"""
|
|
Return a Select where rows having the given relationship URI are removed.
|
|
|
|
Original signature (pandas):
|
|
def delete_relationship_by_str(self, RDF: pd.DataFrame, uri: str) -> pd.DataFrame
|
|
|
|
Updated behavior:
|
|
- RDF is a Select with columns: MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract
|
|
- We apply a WHERE clause: RelationshipURI != <uri>
|
|
- Returns a Select you can continue composing.
|
|
|
|
Args:
|
|
RDF (Select): a selectable representing the RDF joined view
|
|
uri (str): RelationshipURI to exclude
|
|
|
|
Returns:
|
|
Select: filtered selectable (no execution yet)
|
|
"""
|
|
sc = RDF.selected_columns
|
|
return RDF.where(sc.RelationshipURI != literal(uri))
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Frequency filter: MOVIE
|
|
# -------------------------------------------------------------------------
|
|
def generate_frequency_movie_filter(self, MOVIE_COUNT: Select, min_treshold: int, max_treshold: int):
|
|
"""
|
|
You MUST call this before filtering by movie frequency [filter_by_frequency_movie_id()],
|
|
since this method creates such filter.
|
|
|
|
Original behavior:
|
|
- Input MOVIE_COUNT as DataFrame ["MovieID","Count"]
|
|
- Keep rows where Count in [min_treshold, max_treshold)
|
|
- Store the filtered keys in self.MOVIE_FILTER
|
|
|
|
Updated behavior (SQL):
|
|
- MOVIE_COUNT is a Select that yields ["MovieID","Count"].
|
|
- We build and store a *subselect* of allowed MovieID (single column) to be used by WHERE IN.
|
|
- No query is executed here; we only create a new Select.
|
|
|
|
Args:
|
|
MOVIE_COUNT (Select): yields columns MovieID, Count
|
|
min_treshold (int):
|
|
max_treshold (int):
|
|
"""
|
|
sc = MOVIE_COUNT.selected_columns
|
|
filtered = MOVIE_COUNT.where(sc.Count >= min_treshold).where(sc.Count < max_treshold)
|
|
# Keep only the key column so it can be used in an IN (subquery)
|
|
self.MOVIE_FILTER = select(filtered.selected_columns.MovieID)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Frequency filter: RELATIONSHIP
|
|
# -------------------------------------------------------------------------
|
|
def generate_frequency_relationship_filter(self, REL_COUNT: Select, min_treshold: int, max_treshold: int):
|
|
"""
|
|
Original behavior:
|
|
- Input REL_COUNT as DataFrame ["RelationshipURI","Count"]
|
|
- Keep rows where Count in [min_treshold, max_treshold)
|
|
- Store the filtered keys in self.REL_FILTER
|
|
|
|
Updated behavior (SQL):
|
|
- REL_COUNT is a Select that yields ["RelationshipURI","Count"].
|
|
- We build and store a *subselect* of allowed RelationshipURI (single column) to be used by WHERE IN.
|
|
- No query is executed here; we only create a new Select.
|
|
|
|
Args:
|
|
REL_COUNT (Select): yields columns RelationshipURI, Count
|
|
min_treshold (int):
|
|
max_treshold (int):
|
|
"""
|
|
sc = REL_COUNT.selected_columns
|
|
filtered = REL_COUNT.where(sc.Count >= min_treshold).where(sc.Count < max_treshold)
|
|
self.REL_FILTER = select(filtered.selected_columns.RelationshipURI)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Apply frequency filters
|
|
# -------------------------------------------------------------------------
|
|
def filter_by_frequency_movie_id(self, RDF: Select) -> Select:
|
|
"""
|
|
Original behavior (pandas):
|
|
RDF = RDF[RDF["MovieID"].isin(self.MOVIE_FILTER["MovieID"])]
|
|
|
|
Updated behavior (SQL):
|
|
- If self.MOVIE_FILTER is present, apply: WHERE MovieID IN ( <subselect> )
|
|
- Otherwise, return RDF unchanged.
|
|
|
|
Args:
|
|
RDF (Select): current dataset
|
|
|
|
Returns:
|
|
Select: filtered dataset (or unchanged if no filter exists)
|
|
"""
|
|
if self.MOVIE_FILTER is None:
|
|
return RDF
|
|
sc = RDF.selected_columns
|
|
return RDF.where(sc.MovieID.in_(self.MOVIE_FILTER))
|
|
|
|
def filter_by_frequency_relationship(self, RDF: Select) -> Select:
|
|
"""
|
|
Original behavior (pandas):
|
|
RDF = RDF[RDF["RelationshipURI"].isin(self.REL_FILTER["RelationshipURI"])]
|
|
|
|
Updated behavior (SQL):
|
|
- If self.REL_FILTER is present, apply: WHERE RelationshipURI IN ( <subselect> )
|
|
- Otherwise, return RDF unchanged.
|
|
|
|
Args:
|
|
RDF (Select): current dataset
|
|
|
|
Returns:
|
|
Select: filtered dataset (or unchanged if no filter exists)
|
|
"""
|
|
if self.REL_FILTER is None:
|
|
return RDF
|
|
sc = RDF.selected_columns
|
|
return RDF.where(sc.RelationshipURI.in_(self.REL_FILTER))
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Token prefixing (SubjectURI/RelationshipURI/ObjectURI)
|
|
# -------------------------------------------------------------------------
|
|
def rdf_add_special_token(self, RDF: Select) -> Select:
|
|
"""
|
|
Adds RDF special token to each element of the tuple. i.e: SUBJ to SubjectURI,
|
|
OBJ to ObjectURI, REL to RelationshipURI. Check
|
|
Scripts/Libs/CleaningPipeline/special_token.py for the up-to-date special token.
|
|
|
|
It only adds the special token of the three elements of the RDF; no other special token.
|
|
|
|
Original behavior (pandas):
|
|
- String concatenation with columns in a DataFrame.
|
|
- Returned a new DataFrame.
|
|
|
|
Updated behavior (SQL):
|
|
- Build projected columns using SQL string concatenation.
|
|
- Return a new Select with the same output column names:
|
|
["MovieID","SubjectURI","RelationshipURI","ObjectURI","Abstract"].
|
|
|
|
Args:
|
|
RDF (Select): current dataset
|
|
|
|
Returns:
|
|
Select: projected dataset with tokenized SubjectURI/RelationshipURI/ObjectURI
|
|
"""
|
|
sc = RDF.selected_columns
|
|
subj_tok = literal(SpecialToken.SUBJECT.value) + sc.SubjectURI
|
|
rel_tok = literal(SpecialToken.RELATIONSHIP.value) + sc.RelationshipURI
|
|
obj_tok = literal(SpecialToken.OBJECT.value) + sc.ObjectURI
|
|
|
|
return RDF.with_only_columns(
|
|
sc.MovieID.label("MovieID"),
|
|
subj_tok.label("SubjectURI"),
|
|
rel_tok.label("RelationshipURI"),
|
|
obj_tok.label("ObjectURI"),
|
|
sc.Abstract.label("Abstract"),
|
|
)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# NA/empty drop on key columns (SubjectURI, RelationshipURI, ObjectURI)
|
|
# -------------------------------------------------------------------------
|
|
def drop_na_from_dataset(self, RDF: Select) -> Select:
|
|
"""
|
|
Dataset has SubjectURI, RelationshipURI, ObjectURI. We want to drop rows
|
|
where any of these is empty or NULL.
|
|
|
|
Original behavior (pandas):
|
|
- Replace '' with NaN and dropna on the three columns.
|
|
|
|
Updated behavior (SQL):
|
|
- Apply WHERE clauses checking for NOT NULL and not empty string.
|
|
|
|
Args:
|
|
RDF (Select): current dataset
|
|
|
|
Returns:
|
|
Select: dataset filtered to non-empty SubjectURI/RelationshipURI/ObjectURI
|
|
"""
|
|
sc = RDF.selected_columns
|
|
return RDF.where(
|
|
(sc.SubjectURI.is_not(None)) & (sc.SubjectURI != "") &
|
|
(sc.RelationshipURI.is_not(None)) & (sc.RelationshipURI != "") &
|
|
(sc.ObjectURI.is_not(None)) & (sc.ObjectURI != "")
|
|
)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Rebuild by movie (one row per movie)
|
|
# -------------------------------------------------------------------------
|
|
def rebuild_by_movie(self, RDF: Select) -> Select:
|
|
"""
|
|
To execute this method you have to have iterated by movie_id conceptually,
|
|
because as design we want at the end one row for each movie.
|
|
|
|
Original behavior (pandas):
|
|
- Build per-row "Triple" as SubjectURI + RelationshipURI + ObjectURI,
|
|
wrapped with START_TRIPLE/END_TRIPLE.
|
|
- Group by ["MovieID", "Abstract"] and join ("".join) all Triple strings into one.
|
|
- Prefix the whole list with START_TRIPLE_LIST and Abstract with ABSTRACT.
|
|
- Return DataFrame [["MovieID","Triple","Abstract"]].
|
|
|
|
Updated behavior (SQL):
|
|
- Build per-row Triple using SQL string concatenation and constants.
|
|
- Use GROUP_CONCAT (empty separator) to aggregate per-movie.
|
|
- Prefix with START_TRIPLE_LIST and ABSTRACT in SQL.
|
|
- Return a Select with columns: ["MovieID","Triple","Abstract"].
|
|
|
|
Args:
|
|
RDF (Select): current dataset with columns
|
|
MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract
|
|
|
|
Returns:
|
|
Select: aggregated dataset with one row per movie
|
|
"""
|
|
sc = RDF.selected_columns
|
|
|
|
# Per-row triple with START/END_TRIPLE tokens
|
|
row_triple = (
|
|
literal(SpecialToken.START_TRIPLE.value) +
|
|
(sc.SubjectURI + sc.RelationshipURI + sc.ObjectURI) +
|
|
literal(SpecialToken.END_TRIPLE.value)
|
|
).label("Triple")
|
|
|
|
# Prefixed abstract
|
|
abstract_tok = (literal(SpecialToken.ABSTRACT.value) + sc.Abstract).label("Abstract")
|
|
|
|
# Subquery of per-row triples / abstracts
|
|
row_view = RDF.with_only_columns(
|
|
sc.MovieID.label("MovieID"),
|
|
row_triple,
|
|
abstract_tok,
|
|
).subquery()
|
|
|
|
# Concatenate all triples for each movie (SQLite syntax; adjust for other DBs)
|
|
triple_concat = (
|
|
literal(SpecialToken.START_TRIPLE_LIST.value) +
|
|
func.group_concat(row_view.c.Triple, literal(""))
|
|
).label("Triple")
|
|
|
|
return (
|
|
select(
|
|
row_view.c.MovieID.label("MovieID"),
|
|
triple_concat,
|
|
row_view.c.Abstract.label("Abstract"),
|
|
)
|
|
.group_by(row_view.c.MovieID, row_view.c.Abstract)
|
|
)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Build triple(s) projection
|
|
# -------------------------------------------------------------------------
|
|
@staticmethod
|
|
def build_triple(RDF: Select) -> Select:
|
|
"""
|
|
Obtains joined RDF triple in one element, together with START and END special tokens.
|
|
|
|
Original behavior (pandas):
|
|
- Returned a Series/DataFrame column "Triple" built from three string columns.
|
|
|
|
Updated behavior (SQL):
|
|
- Returns a Select with a single column "Triple" built in SQL.
|
|
|
|
Args:
|
|
RDF (Select): at least columns ["SubjectURI", "RelationshipURI", "ObjectURI"]
|
|
|
|
Returns:
|
|
Select: a projection containing one column named "Triple"
|
|
"""
|
|
sc = RDF.selected_columns
|
|
triple = (
|
|
literal(SpecialToken.START_TRIPLE.value) +
|
|
(sc.SubjectURI + sc.RelationshipURI + sc.ObjectURI) +
|
|
literal(SpecialToken.END_TRIPLE.value)
|
|
).label("Triple")
|
|
return RDF.with_only_columns(triple)
|
|
|
|
@staticmethod
|
|
def build_incomplete_triple(RDF: Select) -> Select:
|
|
"""
|
|
Method helper used for the third task: "Predicting a masked component within an RDF triple".
|
|
Obtains joined RDF triple in one element, together with START and END special tokens.
|
|
The MISSING element will be replaced by the special token <MASK>.
|
|
|
|
Original behavior (pandas):
|
|
- Created a Series "Triple" using fallback values for missing columns.
|
|
|
|
Updated behavior (SQL):
|
|
- Uses COALESCE to replace NULLs with <MASK> directly in SQL.
|
|
- Returns a Select with a single column "Triple".
|
|
|
|
Args:
|
|
RDF (Select): 2 of the following columns present ["SubjectURI", "RelationshipURI", "ObjectURI"]
|
|
|
|
Returns:
|
|
Select: projection with column "Triple"
|
|
"""
|
|
sc = RDF.selected_columns
|
|
mask = literal(SpecialToken.MASK.value)
|
|
|
|
triple = (
|
|
literal(SpecialToken.START_TRIPLE.value) +
|
|
(func.coalesce(sc.SubjectURI, mask) +
|
|
func.coalesce(sc.RelationshipURI, mask) +
|
|
func.coalesce(sc.ObjectURI, mask)) +
|
|
literal(SpecialToken.END_TRIPLE.value)
|
|
).label("Triple")
|
|
return RDF.with_only_columns(triple)
|
|
|
|
@staticmethod
|
|
def build_for_mask_task(RDF_incomplete: Select, MISSING) -> None:
|
|
"""
|
|
Currently not used.
|
|
|
|
Original intention:
|
|
Given two DataFrames (one incomplete RDF and another with just the missing component),
|
|
apply special tokens accordingly.
|
|
|
|
Updated note:
|
|
This stub remains for API parity. If needed in the future, it can be implemented
|
|
as a Select-building helper that merges/COALESCEs columns from different selects.
|
|
"""
|
|
return None
|