diff --git a/Scripts/DataCleaning/DataRetrivial.py b/Scripts/DataCleaning/DataRetrivial.py index e1cc061..f9e0d4a 100644 --- a/Scripts/DataCleaning/DataRetrivial.py +++ b/Scripts/DataCleaning/DataRetrivial.py @@ -1,200 +1,249 @@ import sqlite3 -import pandas as pd +import csv # --- Global configuration --- -DB_NAME = "./Assets/Dataset/DatawareHouse/dataset.db" +DB_NAME = "./Assets/Dataset/Tmp/dataset.db" MOVIES_CSV = "./Assets/Dataset/1-hop/movies.csv" PAGEID_CSV = "./Assets/Dataset/1-hop/movie-pageid.csv" SUMMARY_CSV = "./Assets/Dataset/1-hop/wikipedia-summary.csv" DATASET_CSV = "./Assets/Dataset/1-hop/dataset.csv" REVERSE_CSV = "./Assets/Dataset/1-hop/reverse.csv" -CHUNK_SIZE = 50000 # adjust based on memory + + +# --- Helper: idempotent insert-or-select --- +def get_or_create(cursor, table, column, value, origin_id=None): + # tries to put new values in db, then get the id (regardless of the check) + # Subjects and Objects need origin_id. Relationships do not + # try/except on INSERT keeps IDs contiguous (no AUTOINCREMENT jumps) + + try: + if origin_id is not None: + cursor.execute( + f"INSERT INTO {table} ({column}, OriginID) VALUES (?, ?)", + (value, int(origin_id)), + ) + else: + cursor.execute( + f"INSERT INTO {table} ({column}) VALUES (?)", + (value,), + ) + except sqlite3.IntegrityError: + # Row already exists, do nothing + pass + + # Always fetch the ID (whether new or existing) + # {table[:-1]}ID -> + # Subjects -> SubjectID + # Objects -> ObjectID + # Relationships -> RelationshipID + # kinda hardcoded + cursor.execute(f"SELECT {table[:-1]}ID FROM {table} WHERE {column}=?", (value,)) + return cursor.fetchone()[0] # fetchone returns a list with one element + + # --- Load Movies --- def load_movies(): + # Creates Movies: MovieID [PK] / Movie URI + # MovieID is managed by sql conn = sqlite3.connect(DB_NAME) - total_inserted = 0 - for chunk in pd.read_csv(MOVIES_CSV, chunksize=CHUNK_SIZE): - chunk.rename(columns={"subject": "MovieURI"}, inplace=True) - chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip() - chunk.to_sql("Movies", conn, if_exists="append", index=False) - total_inserted += len(chunk) - print(f"Movies loaded: {total_inserted} rows inserted.") + cur = conn.cursor() + total = 0 + # MOVIES_CSV: "subject" [it has only this column] + with open(MOVIES_CSV, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + movie_uri = row["subject"].strip() + # try/except on INSERT keeps IDs contiguous (no AUTOINCREMENT jumps) + try: + cur.execute("INSERT INTO Movies (MovieURI) VALUES (?)", (movie_uri,)) + total += 1 # count only if a new row was added + except sqlite3.IntegrityError: + # already exists, skip + pass + conn.commit() # suggested by dr conn.close() + print(f"Movies loaded: {total}") + # --- Load Origins --- def load_origins(): + # Creates Origins: OriginID [PK]/ Origin Name + # ["Dataset", "Reverse"] conn = sqlite3.connect(DB_NAME) - origins = pd.DataFrame({"OriginName": ["Dataset", "Reverse"]}) - origins.to_sql("Origins", conn, if_exists="append", index=False) + cur = conn.cursor() + for origin in ["Dataset", "Reverse"]: + try: + cur.execute("INSERT INTO Origins (OriginName) VALUES (?)", (origin,)) + except sqlite3.IntegrityError: + pass + conn.commit() conn.close() print("Origins loaded.") + # --- Load WikiPageIDs --- def load_wikipageids(): + # Creates WikiPageIDs: MovieID [PK, FK]/ PageId [IDX] (wiki) conn = sqlite3.connect(DB_NAME) - movies_df = pd.read_sql_query("SELECT MovieID, MovieURI FROM Movies", conn) - movies_df["MovieURI"] = movies_df["MovieURI"].astype(str).str.strip() - - total_inserted = 0 - for chunk in pd.read_csv(PAGEID_CSV, chunksize=CHUNK_SIZE): - chunk.rename(columns={"subject": "MovieURI", "object": "PageID"}, inplace=True) - chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip() - chunk["PageID"] = chunk["PageID"].astype(int) - merged = chunk.merge(movies_df, on="MovieURI", how="inner") - if not merged.empty: - merged[["MovieID", "PageID"]].to_sql("WikiPageIDs", conn, if_exists="append", index=False) - total_inserted += len(merged) - print(f"WikiPageIDs loaded: {total_inserted} rows inserted.") + cur = conn.cursor() + total = 0 + # PAGEID_CSV: "subject","object" -> MovieURI, WikiPageId + with open(PAGEID_CSV, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + movie_uri = row["subject"].strip() + page_id = int(row["object"]) + cur.execute("SELECT MovieID FROM Movies WHERE MovieURI=?", (movie_uri,)) + movie = cur.fetchone() + if movie: + try: + # it can become INSERT OR IGNORE instead of try catch + cur.execute( + "INSERT INTO WikiPageIDs (MovieID, PageID) VALUES (?, ?)", + (movie[0], page_id), + ) + total += 1 + except sqlite3.IntegrityError: + pass + conn.commit() conn.close() + print(f"WikiPageIDs loaded: {total}") + # --- Load Wikipedia Abstracts --- def load_abstracts(): + # Cretes WikipediaAbstracts: MovieID [PK, FK]/ abstract conn = sqlite3.connect(DB_NAME) - # Get MovieID mapping from WikiPageIDs - pageid_df = pd.read_sql_query("SELECT MovieID, PageID FROM WikiPageIDs", conn) - pageid_df["PageID"] = pageid_df["PageID"].astype(int) - - total_inserted = 0 - for chunk in pd.read_csv(SUMMARY_CSV, chunksize=CHUNK_SIZE): - chunk.rename(columns={"subject": "PageID", "text": "Abstract"}, inplace=True) - chunk["PageID"] = chunk["PageID"].astype(int) - merged = chunk.merge(pageid_df, on="PageID", how="inner") - if not merged.empty: - merged[["MovieID", "Abstract"]].to_sql("WikipediaAbstracts", conn, if_exists="append", index=False) - total_inserted += len(merged) - if total_inserted == 0: - print("No abstracts inserted — table WikipediaAbstracts is empty.") - else: - print(f"WikipediaAbstracts loaded: {total_inserted} rows inserted.") + cur = conn.cursor() + total = 0 + # SUMMARY_CSV: subject,text -> WikiPageID / abstract + with open(SUMMARY_CSV, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + page_id = int(row["subject"]) + abstract = row["text"].strip() + # WikiPageIDs: MovieID [PK, FK]/ PageId [IDX] (wiki) + cur.execute("SELECT MovieID FROM WikiPageIDs WHERE PageID=?", (page_id,)) + movie = cur.fetchone() # which is MovieID + if movie: + try: + # it can become INSERT OR IGNORE instead of try catch + cur.execute( + "INSERT INTO WikipediaAbstracts (MovieID, Abstract) VALUES (?, ?)", + (movie[0], abstract), + ) + total += 1 + except sqlite3.IntegrityError as e: + print(e) + pass + conn.commit() conn.close() + print(f"WikipediaAbstracts loaded: {total}") + # --- Load Dataset RDFs --- -# --- Helper function to get or create an entry and return its ID --- -def get_or_create(cursor, table, column, value, origin_id=None): - # is idempotent! - # Check existence only on the value itself (because the column is UNIQUE) - cursor.execute(f"SELECT {table[:-1]}ID FROM {table} WHERE {column}=?", (value,)) - result = cursor.fetchone() - if result: - return result[0] # IDEMPOTENT: if the object already exists, there isn't another insert - else: - if origin_id is not None: - cursor.execute(f"INSERT INTO {table} ({column}, OriginID) VALUES (?, ?)", (value, origin_id)) - else: - cursor.execute(f"INSERT INTO {table} ({column}) VALUES (?)", (value,)) - return cursor.lastrowid - -# --- Load Dataset RDFs --- -def load_dataset(): +def load_dataset(): conn = sqlite3.connect(DB_NAME) - cursor = conn.cursor() + cur = conn.cursor() + # get oridin_id from datset + cur.execute("SELECT OriginID FROM Origins WHERE OriginName='Dataset'") + origin_id = int(cur.fetchone()[0]) + print(f"Origin_id is: {origin_id}") + #### + total = 0 + skipped_movie = 0 + # DATASET_CSV: "subject","relationshi","object" -> MovieUri, RelationshipUri, ObjectUri + with open(DATASET_CSV, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + movie_uri = row["subject"].strip() + relationship_uri = row["relationship"].strip() + object_uri = row["object"].strip() - # --- Load Movies mapping --- - movies_df = pd.read_sql_query("SELECT MovieID, MovieURI FROM Movies", conn) - movies_df["MovieURI"] = movies_df["MovieURI"].astype(str).str.strip() + cur.execute("SELECT MovieID FROM Movies WHERE MovieURI=?", (movie_uri,)) + movie = cur.fetchone() + if not movie: + skipped_movie += skipped_movie + continue + # it is a guard check if it doest exist a MovieID from the given MovieURI, then skip - # --- Get Dataset OriginID --- - origin_id = pd.read_sql_query("SELECT OriginID FROM Origins WHERE OriginName='Dataset'", conn)["OriginID"].iloc[0] + # now put each URI into their SCHEMA and retrieves IDs instead + subject_id = get_or_create(cur, "Subjects", "SubjectURI", movie_uri, origin_id) + relationship_id = get_or_create(cur, "Relationships", "RelationshipURI", relationship_uri) + object_id = get_or_create(cur, "Objects", "ObjectURI", object_uri, origin_id) - total_rdfs = 0 - - for chunk in pd.read_csv(DATASET_CSV, chunksize=CHUNK_SIZE): - chunk.rename(columns={"subject": "MovieURI", "relationship": "Relationship", "object": "ObjectURI"}, inplace=True) - chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip() - chunk["ObjectURI"] = chunk["ObjectURI"].astype(str).str.strip() - chunk["Relationship"] = chunk["Relationship"].astype(str).str.strip() - - # --- Merge to get MovieID --- - merged = chunk.merge(movies_df, on="MovieURI", how="inner") # movideId / Subject (MovieUri) / Rel / Obj - if merged.empty: - continue - - for _, row in merged.iterrows(): # HERE, EACH ROW IS ELABORATED ALONE - # Subjects: the Movie itself as SubjectURI <---- Remember Subject is renamed as MovieURI - subject_id = get_or_create(cursor, "Subjects", "SubjectURI", row["MovieURI"], origin_id) - - # Relationships - relationship_id = get_or_create(cursor, "Relationships", "RelationshipURI", row["Relationship"]) - - # Objects - object_id = get_or_create(cursor, "Objects", "ObjectURI", row["ObjectURI"], origin_id) - - # RDFs: only insert if the triplet doesn't exist - cursor.execute( + # check if the triple is already in the RDF + cur.execute( "SELECT RDF_ID FROM RDFs WHERE SubjectID=? AND RelationshipID=? AND ObjectID=?", - (subject_id, relationship_id, object_id) + (subject_id, relationship_id, object_id), ) - if not cursor.fetchone(): - cursor.execute( + if not cur.fetchone(): + cur.execute( "INSERT INTO RDFs (MovieID, SubjectID, RelationshipID, ObjectID) VALUES (?, ?, ?, ?)", - (row["MovieID"], subject_id, relationship_id, object_id) + (movie[0], subject_id, relationship_id, object_id), ) - total_rdfs += 1 - - conn.commit() - - print(f"Dataset RDFs loaded: {total_rdfs} rows inserted.") + total += 1 + conn.commit() conn.close() + print(f"Dataset RDFs loaded: {total}") + print(f"Skipped Movies: {skipped_movie}") + -# --- Load Reverse RDFs --- # --- Load Reverse RDFs --- def load_reverse(): conn = sqlite3.connect(DB_NAME) - cursor = conn.cursor() + cur = conn.cursor() + # get oridin_id from datset + cur.execute("SELECT OriginID FROM Origins WHERE OriginName='Reverse'") + origin_id = int(cur.fetchone()[0]) + print(f"Origin_id is: {origin_id}") + ### + total = 0 + skipped_movie = 0 + # REVERSE_CSV : "subject","relationship","object" -> SubjectURI, RelationshipURI, MovieURI + with open(REVERSE_CSV, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + subject_uri = row["subject"].strip() + relationship_uri = row["relationship"].strip() + movie_uri = row["object"].strip() - # --- Load Movies mapping --- - movies_df = pd.read_sql_query("SELECT MovieID, MovieURI FROM Movies", conn) - movies_df["MovieURI"] = movies_df["MovieURI"].astype(str).str.strip() + cur.execute("SELECT MovieID FROM Movies WHERE MovieURI=?", (movie_uri,)) + movie = cur.fetchone() + if not movie: + skipped_movie += skipped_movie + continue + # it is a guard check if it doest exist a MovieID from the given MovieURI, then skip - # --- Get Reverse OriginID --- - origin_id = pd.read_sql_query("SELECT OriginID FROM Origins WHERE OriginName='Reverse'", conn)["OriginID"].iloc[0] + # now put each URI into their SCHEMA and retrieves IDs instead + subject_id = get_or_create(cur, "Subjects", "SubjectURI", subject_uri, origin_id) + relationship_id = get_or_create(cur, "Relationships", "RelationshipURI", relationship_uri) + object_id = get_or_create(cur, "Objects", "ObjectURI", movie_uri, origin_id) - total_rdfs = 0 - - for chunk in pd.read_csv(REVERSE_CSV, chunksize=CHUNK_SIZE): - chunk.rename(columns={"subject": "SubjectURI", "relationship": "Relationship", "object": "MovieURI"}, inplace=True) - chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip() - chunk["SubjectURI"] = chunk["SubjectURI"].astype(str).str.strip() - chunk["Relationship"] = chunk["Relationship"].astype(str).str.strip() - - # --- Merge to get MovieID --- - merged = chunk.merge(movies_df, on="MovieURI", how="inner") - if merged.empty: - continue - - for _, row in merged.iterrows(): - # Subjects: from Reverse CSV - subject_id = get_or_create(cursor, "Subjects", "SubjectURI", row["SubjectURI"], origin_id) - - # Relationships - relationship_id = get_or_create(cursor, "Relationships", "RelationshipURI", row["Relationship"]) - - # Objects: the Movie itself as ObjectURI - object_id = get_or_create(cursor, "Objects", "ObjectURI", row["MovieURI"], origin_id) - - # RDFs: only insert if the triplet doesn't exist - cursor.execute( + # check if the triple is already in the RDF + cur.execute( "SELECT RDF_ID FROM RDFs WHERE SubjectID=? AND RelationshipID=? AND ObjectID=?", - (subject_id, relationship_id, object_id) + (subject_id, relationship_id, object_id), ) - if not cursor.fetchone(): - cursor.execute( + if not cur.fetchone(): + cur.execute( "INSERT INTO RDFs (MovieID, SubjectID, RelationshipID, ObjectID) VALUES (?, ?, ?, ?)", - (row["MovieID"], subject_id, relationship_id, object_id) + (movie[0], subject_id, relationship_id, object_id), ) - total_rdfs += 1 - - conn.commit() - - print(f"Reverse RDFs loaded: {total_rdfs} rows inserted.") + total += 1 + conn.commit() conn.close() + print(f"Reverse RDFs loaded: {total}") + print(f"Skipped Movies: {skipped_movie}") -# --- Append the calls --- -# load_movies() -# load_origins() -# load_wikipageids() -# load_abstracts() -# load_dataset() + +# --- Execution order --- +load_movies() +load_origins() +load_wikipageids() +load_abstracts() +load_dataset() load_reverse() +# sqlite3 ./Assets/Dataset/Tmp/dataset.db < ./Scripts/DataCleaning/SQL_Queries/db_creation.sql \ No newline at end of file