# This file deletes in the pipeline the unwanted relationship by different rules # ----------------------------------------------------------------------------- # SQL-FIRST VERSION # ----------------------------------------------------------------------------- # In the original (pandas) version this module: # - stored frequency filters in DataFrames, # - filtered/cleaned DataFrames in-memory, # - added special tokens via string ops, # - rebuilt one row per movie using groupby/aggregation. # # In this rewrite: # - Every transformation RETURNS a SQLAlchemy `Select` object instead of a DataFrame. # - Your pipeline can pass this `Select` (a "dataview") from one stage to the next, # composing more SQL lazily. Nothing is executed until you call `session.execute(...)`. # - Frequency filters are represented as SUBSELECTS, applied with `WHERE IN (subquery)`. # # Notes: # - We keep the same CLASS and METHOD NAMES to preserve call sites. # - Method comments/docstrings from your original file are carried over and updated # to reflect Select-based behavior and return types. # - We drop pandas/numpy/sqlite3 imports because filtering is pushed into SQL. # - `GROUP_CONCAT` is used for the rebuild phase (SQLite-compatible). For other DBs, # swap with an equivalent string-agg function. # ----------------------------------------------------------------------------- from __future__ import annotations from typing import Optional from sqlalchemy import select, func, literal from sqlalchemy.sql import Select from Scripts.Libs.CleaningPipeline.special_token import SpecialToken class PipelineApplier(): """ SQL-first pipeline applier. In the pandas version, frequency filters were stored as DataFrames (self.MOVIE_FILTER / self.REL_FILTER) and every method worked with/returned pandas.DataFrame. In this SQLAlchemy rewrite: - self.MOVIE_FILTER and self.REL_FILTER become *subselects* (Select objects) that yield a single column each (MovieID or RelationshipURI). These subselects can be applied via `WHERE IN (subquery)`. - Every method that previously returned a DataFrame now returns a *Select* that represents the same logical transformation, but pushed into the database engine. - Comments and docstrings are updated to reflect SQL semantics while preserving your original intent. """ def __init__(self): # In the pandas version these were DataFrames storing allowed keys. # Here they are Select objects (single-column subselects) or None. # Expected column names: # - self.MOVIE_FILTER: "MovieID" # - self.REL_FILTER: "RelationshipURI" self.MOVIE_FILTER: Optional[Select] = None self.REL_FILTER: Optional[Select] = None # ------------------------------------------------------------------------- # Relationship deletion # ------------------------------------------------------------------------- def delete_relationship_by_str(self, RDF: Select, uri: str) -> Select: """ Return a Select where rows having the given relationship URI are removed. Original signature (pandas): def delete_relationship_by_str(self, RDF: pd.DataFrame, uri: str) -> pd.DataFrame Updated behavior: - RDF is a Select with columns: MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract - We apply a WHERE clause: RelationshipURI != - Returns a Select you can continue composing. Args: RDF (Select): a selectable representing the RDF joined view uri (str): RelationshipURI to exclude Returns: Select: filtered selectable (no execution yet) """ sc = RDF.selected_columns return RDF.where(sc.RelationshipURI != literal(uri)) # ------------------------------------------------------------------------- # Frequency filter: MOVIE # ------------------------------------------------------------------------- def generate_frequency_movie_filter(self, MOVIE_COUNT: Select, min_treshold: int, max_treshold: int): """ You MUST call this before filtering by movie frequency [filter_by_frequency_movie_id()], since this method creates such filter. Original behavior: - Input MOVIE_COUNT as DataFrame ["MovieID","Count"] - Keep rows where Count in [min_treshold, max_treshold) - Store the filtered keys in self.MOVIE_FILTER Updated behavior (SQL): - MOVIE_COUNT is a Select that yields ["MovieID","Count"]. - We build and store a *subselect* of allowed MovieID (single column) to be used by WHERE IN. - No query is executed here; we only create a new Select. Args: MOVIE_COUNT (Select): yields columns MovieID, Count min_treshold (int): max_treshold (int): """ sc = MOVIE_COUNT.selected_columns filtered = MOVIE_COUNT.where(sc.Count >= min_treshold).where(sc.Count < max_treshold) # Keep only the key column so it can be used in an IN (subquery) self.MOVIE_FILTER = select(filtered.selected_columns.MovieID) # ------------------------------------------------------------------------- # Frequency filter: RELATIONSHIP # ------------------------------------------------------------------------- def generate_frequency_relationship_filter(self, REL_COUNT: Select, min_treshold: int, max_treshold: int): """ Original behavior: - Input REL_COUNT as DataFrame ["RelationshipURI","Count"] - Keep rows where Count in [min_treshold, max_treshold) - Store the filtered keys in self.REL_FILTER Updated behavior (SQL): - REL_COUNT is a Select that yields ["RelationshipURI","Count"]. - We build and store a *subselect* of allowed RelationshipURI (single column) to be used by WHERE IN. - No query is executed here; we only create a new Select. Args: REL_COUNT (Select): yields columns RelationshipURI, Count min_treshold (int): max_treshold (int): """ sc = REL_COUNT.selected_columns filtered = REL_COUNT.where(sc.Count >= min_treshold).where(sc.Count < max_treshold) self.REL_FILTER = select(filtered.selected_columns.RelationshipURI) # ------------------------------------------------------------------------- # Apply frequency filters # ------------------------------------------------------------------------- def filter_by_frequency_movie_id(self, RDF: Select) -> Select: """ Original behavior (pandas): RDF = RDF[RDF["MovieID"].isin(self.MOVIE_FILTER["MovieID"])] Updated behavior (SQL): - If self.MOVIE_FILTER is present, apply: WHERE MovieID IN ( ) - Otherwise, return RDF unchanged. Args: RDF (Select): current dataset Returns: Select: filtered dataset (or unchanged if no filter exists) """ if self.MOVIE_FILTER is None: return RDF sc = RDF.selected_columns return RDF.where(sc.MovieID.in_(self.MOVIE_FILTER)) def filter_by_frequency_relationship(self, RDF: Select) -> Select: """ Original behavior (pandas): RDF = RDF[RDF["RelationshipURI"].isin(self.REL_FILTER["RelationshipURI"])] Updated behavior (SQL): - If self.REL_FILTER is present, apply: WHERE RelationshipURI IN ( ) - Otherwise, return RDF unchanged. Args: RDF (Select): current dataset Returns: Select: filtered dataset (or unchanged if no filter exists) """ if self.REL_FILTER is None: return RDF sc = RDF.selected_columns return RDF.where(sc.RelationshipURI.in_(self.REL_FILTER)) # ------------------------------------------------------------------------- # Token prefixing (SubjectURI/RelationshipURI/ObjectURI) # ------------------------------------------------------------------------- def rdf_add_special_token(self, RDF: Select) -> Select: """ Adds RDF special token to each element of the tuple. i.e: SUBJ to SubjectURI, OBJ to ObjectURI, REL to RelationshipURI. Check Scripts/Libs/CleaningPipeline/special_token.py for the up-to-date special token. It only adds the special token of the three elements of the RDF; no other special token. Original behavior (pandas): - String concatenation with columns in a DataFrame. - Returned a new DataFrame. Updated behavior (SQL): - Build projected columns using SQL string concatenation. - Return a new Select with the same output column names: ["MovieID","SubjectURI","RelationshipURI","ObjectURI","Abstract"]. Args: RDF (Select): current dataset Returns: Select: projected dataset with tokenized SubjectURI/RelationshipURI/ObjectURI """ sc = RDF.selected_columns subj_tok = literal(SpecialToken.SUBJECT.value) + sc.SubjectURI rel_tok = literal(SpecialToken.RELATIONSHIP.value) + sc.RelationshipURI obj_tok = literal(SpecialToken.OBJECT.value) + sc.ObjectURI return RDF.with_only_columns( sc.MovieID.label("MovieID"), subj_tok.label("SubjectURI"), rel_tok.label("RelationshipURI"), obj_tok.label("ObjectURI"), sc.Abstract.label("Abstract"), ) # ------------------------------------------------------------------------- # NA/empty drop on key columns (SubjectURI, RelationshipURI, ObjectURI) # ------------------------------------------------------------------------- def drop_na_from_dataset(self, RDF: Select) -> Select: """ Dataset has SubjectURI, RelationshipURI, ObjectURI. We want to drop rows where any of these is empty or NULL. Original behavior (pandas): - Replace '' with NaN and dropna on the three columns. Updated behavior (SQL): - Apply WHERE clauses checking for NOT NULL and not empty string. Args: RDF (Select): current dataset Returns: Select: dataset filtered to non-empty SubjectURI/RelationshipURI/ObjectURI """ sc = RDF.selected_columns return RDF.where( (sc.SubjectURI.is_not(None)) & (sc.SubjectURI != "") & (sc.RelationshipURI.is_not(None)) & (sc.RelationshipURI != "") & (sc.ObjectURI.is_not(None)) & (sc.ObjectURI != "") ) # ------------------------------------------------------------------------- # Rebuild by movie (one row per movie) # ------------------------------------------------------------------------- def rebuild_by_movie(self, RDF: Select) -> Select: """ To execute this method you have to have iterated by movie_id conceptually, because as design we want at the end one row for each movie. Original behavior (pandas): - Build per-row "Triple" as SubjectURI + RelationshipURI + ObjectURI, wrapped with START_TRIPLE/END_TRIPLE. - Group by ["MovieID", "Abstract"] and join ("".join) all Triple strings into one. - Prefix the whole list with START_TRIPLE_LIST and Abstract with ABSTRACT. - Return DataFrame [["MovieID","Triple","Abstract"]]. Updated behavior (SQL): - Build per-row Triple using SQL string concatenation and constants. - Use GROUP_CONCAT (empty separator) to aggregate per-movie. - Prefix with START_TRIPLE_LIST and ABSTRACT in SQL. - Return a Select with columns: ["MovieID","Triple","Abstract"]. Args: RDF (Select): current dataset with columns MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract Returns: Select: aggregated dataset with one row per movie """ sc = RDF.selected_columns # Per-row triple with START/END_TRIPLE tokens row_triple = ( literal(SpecialToken.START_TRIPLE.value) + (sc.SubjectURI + sc.RelationshipURI + sc.ObjectURI) + literal(SpecialToken.END_TRIPLE.value) ).label("Triple") # Prefixed abstract abstract_tok = (literal(SpecialToken.ABSTRACT.value) + sc.Abstract).label("Abstract") # Subquery of per-row triples / abstracts row_view = RDF.with_only_columns( sc.MovieID.label("MovieID"), row_triple, abstract_tok, ).subquery() # Concatenate all triples for each movie (SQLite syntax; adjust for other DBs) triple_concat = ( literal(SpecialToken.START_TRIPLE_LIST.value) + func.group_concat(row_view.c.Triple, literal("")) ).label("Triple") return ( select( row_view.c.MovieID.label("MovieID"), triple_concat, row_view.c.Abstract.label("Abstract"), ) .group_by(row_view.c.MovieID, row_view.c.Abstract) ) # ------------------------------------------------------------------------- # Build triple(s) projection # ------------------------------------------------------------------------- @staticmethod def build_triple(RDF: Select) -> Select: """ Obtains joined RDF triple in one element, together with START and END special tokens. Original behavior (pandas): - Returned a Series/DataFrame column "Triple" built from three string columns. Updated behavior (SQL): - Returns a Select with a single column "Triple" built in SQL. Args: RDF (Select): at least columns ["SubjectURI", "RelationshipURI", "ObjectURI"] Returns: Select: a projection containing one column named "Triple" """ sc = RDF.selected_columns triple = ( literal(SpecialToken.START_TRIPLE.value) + (sc.SubjectURI + sc.RelationshipURI + sc.ObjectURI) + literal(SpecialToken.END_TRIPLE.value) ).label("Triple") return RDF.with_only_columns(triple) @staticmethod def build_incomplete_triple(RDF: Select) -> Select: """ Method helper used for the third task: "Predicting a masked component within an RDF triple". Obtains joined RDF triple in one element, together with START and END special tokens. The MISSING element will be replaced by the special token . Original behavior (pandas): - Created a Series "Triple" using fallback values for missing columns. Updated behavior (SQL): - Uses COALESCE to replace NULLs with directly in SQL. - Returns a Select with a single column "Triple". Args: RDF (Select): 2 of the following columns present ["SubjectURI", "RelationshipURI", "ObjectURI"] Returns: Select: projection with column "Triple" """ sc = RDF.selected_columns mask = literal(SpecialToken.MASK.value) triple = ( literal(SpecialToken.START_TRIPLE.value) + (func.coalesce(sc.SubjectURI, mask) + func.coalesce(sc.RelationshipURI, mask) + func.coalesce(sc.ObjectURI, mask)) + literal(SpecialToken.END_TRIPLE.value) ).label("Triple") return RDF.with_only_columns(triple) @staticmethod def build_for_mask_task(RDF_incomplete: Select, MISSING) -> None: """ Currently not used. Original intention: Given two DataFrames (one incomplete RDF and another with just the missing component), apply special tokens accordingly. Updated note: This stub remains for API parity. If needed in the future, it can be implemented as a Select-building helper that merges/COALESCEs columns from different selects. """ return None