From 8819b8e87fd10c2edf51d99a0a9ca3be461574de Mon Sep 17 00:00:00 2001 From: GassiGiuseppe Date: Sat, 20 Sep 2025 19:56:24 +0200 Subject: [PATCH] DataRetrivial populate the db from csv --- Scripts/DataCleaning/DataRetrivial.py | 200 ++++++++++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 Scripts/DataCleaning/DataRetrivial.py diff --git a/Scripts/DataCleaning/DataRetrivial.py b/Scripts/DataCleaning/DataRetrivial.py new file mode 100644 index 0000000..e1cc061 --- /dev/null +++ b/Scripts/DataCleaning/DataRetrivial.py @@ -0,0 +1,200 @@ +import sqlite3 +import pandas as pd + +# --- Global configuration --- +DB_NAME = "./Assets/Dataset/DatawareHouse/dataset.db" +MOVIES_CSV = "./Assets/Dataset/1-hop/movies.csv" +PAGEID_CSV = "./Assets/Dataset/1-hop/movie-pageid.csv" +SUMMARY_CSV = "./Assets/Dataset/1-hop/wikipedia-summary.csv" +DATASET_CSV = "./Assets/Dataset/1-hop/dataset.csv" +REVERSE_CSV = "./Assets/Dataset/1-hop/reverse.csv" +CHUNK_SIZE = 50000 # adjust based on memory + +# --- Load Movies --- +def load_movies(): + conn = sqlite3.connect(DB_NAME) + total_inserted = 0 + for chunk in pd.read_csv(MOVIES_CSV, chunksize=CHUNK_SIZE): + chunk.rename(columns={"subject": "MovieURI"}, inplace=True) + chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip() + chunk.to_sql("Movies", conn, if_exists="append", index=False) + total_inserted += len(chunk) + print(f"Movies loaded: {total_inserted} rows inserted.") + conn.close() + +# --- Load Origins --- +def load_origins(): + conn = sqlite3.connect(DB_NAME) + origins = pd.DataFrame({"OriginName": ["Dataset", "Reverse"]}) + origins.to_sql("Origins", conn, if_exists="append", index=False) + conn.close() + print("Origins loaded.") + +# --- Load WikiPageIDs --- +def load_wikipageids(): + conn = sqlite3.connect(DB_NAME) + movies_df = pd.read_sql_query("SELECT MovieID, MovieURI FROM Movies", conn) + movies_df["MovieURI"] = movies_df["MovieURI"].astype(str).str.strip() + + total_inserted = 0 + for chunk in pd.read_csv(PAGEID_CSV, chunksize=CHUNK_SIZE): + chunk.rename(columns={"subject": "MovieURI", "object": "PageID"}, inplace=True) + chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip() + chunk["PageID"] = chunk["PageID"].astype(int) + merged = chunk.merge(movies_df, on="MovieURI", how="inner") + if not merged.empty: + merged[["MovieID", "PageID"]].to_sql("WikiPageIDs", conn, if_exists="append", index=False) + total_inserted += len(merged) + print(f"WikiPageIDs loaded: {total_inserted} rows inserted.") + conn.close() + +# --- Load Wikipedia Abstracts --- +def load_abstracts(): + conn = sqlite3.connect(DB_NAME) + # Get MovieID mapping from WikiPageIDs + pageid_df = pd.read_sql_query("SELECT MovieID, PageID FROM WikiPageIDs", conn) + pageid_df["PageID"] = pageid_df["PageID"].astype(int) + + total_inserted = 0 + for chunk in pd.read_csv(SUMMARY_CSV, chunksize=CHUNK_SIZE): + chunk.rename(columns={"subject": "PageID", "text": "Abstract"}, inplace=True) + chunk["PageID"] = chunk["PageID"].astype(int) + merged = chunk.merge(pageid_df, on="PageID", how="inner") + if not merged.empty: + merged[["MovieID", "Abstract"]].to_sql("WikipediaAbstracts", conn, if_exists="append", index=False) + total_inserted += len(merged) + if total_inserted == 0: + print("No abstracts inserted — table WikipediaAbstracts is empty.") + else: + print(f"WikipediaAbstracts loaded: {total_inserted} rows inserted.") + conn.close() + +# --- Load Dataset RDFs --- +# --- Helper function to get or create an entry and return its ID --- +def get_or_create(cursor, table, column, value, origin_id=None): + # is idempotent! + # Check existence only on the value itself (because the column is UNIQUE) + cursor.execute(f"SELECT {table[:-1]}ID FROM {table} WHERE {column}=?", (value,)) + result = cursor.fetchone() + if result: + return result[0] # IDEMPOTENT: if the object already exists, there isn't another insert + else: + if origin_id is not None: + cursor.execute(f"INSERT INTO {table} ({column}, OriginID) VALUES (?, ?)", (value, origin_id)) + else: + cursor.execute(f"INSERT INTO {table} ({column}) VALUES (?)", (value,)) + return cursor.lastrowid + +# --- Load Dataset RDFs --- +def load_dataset(): + conn = sqlite3.connect(DB_NAME) + cursor = conn.cursor() + + # --- Load Movies mapping --- + movies_df = pd.read_sql_query("SELECT MovieID, MovieURI FROM Movies", conn) + movies_df["MovieURI"] = movies_df["MovieURI"].astype(str).str.strip() + + # --- Get Dataset OriginID --- + origin_id = pd.read_sql_query("SELECT OriginID FROM Origins WHERE OriginName='Dataset'", conn)["OriginID"].iloc[0] + + total_rdfs = 0 + + for chunk in pd.read_csv(DATASET_CSV, chunksize=CHUNK_SIZE): + chunk.rename(columns={"subject": "MovieURI", "relationship": "Relationship", "object": "ObjectURI"}, inplace=True) + chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip() + chunk["ObjectURI"] = chunk["ObjectURI"].astype(str).str.strip() + chunk["Relationship"] = chunk["Relationship"].astype(str).str.strip() + + # --- Merge to get MovieID --- + merged = chunk.merge(movies_df, on="MovieURI", how="inner") # movideId / Subject (MovieUri) / Rel / Obj + if merged.empty: + continue + + for _, row in merged.iterrows(): # HERE, EACH ROW IS ELABORATED ALONE + # Subjects: the Movie itself as SubjectURI <---- Remember Subject is renamed as MovieURI + subject_id = get_or_create(cursor, "Subjects", "SubjectURI", row["MovieURI"], origin_id) + + # Relationships + relationship_id = get_or_create(cursor, "Relationships", "RelationshipURI", row["Relationship"]) + + # Objects + object_id = get_or_create(cursor, "Objects", "ObjectURI", row["ObjectURI"], origin_id) + + # RDFs: only insert if the triplet doesn't exist + cursor.execute( + "SELECT RDF_ID FROM RDFs WHERE SubjectID=? AND RelationshipID=? AND ObjectID=?", + (subject_id, relationship_id, object_id) + ) + if not cursor.fetchone(): + cursor.execute( + "INSERT INTO RDFs (MovieID, SubjectID, RelationshipID, ObjectID) VALUES (?, ?, ?, ?)", + (row["MovieID"], subject_id, relationship_id, object_id) + ) + total_rdfs += 1 + + conn.commit() + + print(f"Dataset RDFs loaded: {total_rdfs} rows inserted.") + conn.close() + +# --- Load Reverse RDFs --- +# --- Load Reverse RDFs --- +def load_reverse(): + conn = sqlite3.connect(DB_NAME) + cursor = conn.cursor() + + # --- Load Movies mapping --- + movies_df = pd.read_sql_query("SELECT MovieID, MovieURI FROM Movies", conn) + movies_df["MovieURI"] = movies_df["MovieURI"].astype(str).str.strip() + + # --- Get Reverse OriginID --- + origin_id = pd.read_sql_query("SELECT OriginID FROM Origins WHERE OriginName='Reverse'", conn)["OriginID"].iloc[0] + + total_rdfs = 0 + + for chunk in pd.read_csv(REVERSE_CSV, chunksize=CHUNK_SIZE): + chunk.rename(columns={"subject": "SubjectURI", "relationship": "Relationship", "object": "MovieURI"}, inplace=True) + chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip() + chunk["SubjectURI"] = chunk["SubjectURI"].astype(str).str.strip() + chunk["Relationship"] = chunk["Relationship"].astype(str).str.strip() + + # --- Merge to get MovieID --- + merged = chunk.merge(movies_df, on="MovieURI", how="inner") + if merged.empty: + continue + + for _, row in merged.iterrows(): + # Subjects: from Reverse CSV + subject_id = get_or_create(cursor, "Subjects", "SubjectURI", row["SubjectURI"], origin_id) + + # Relationships + relationship_id = get_or_create(cursor, "Relationships", "RelationshipURI", row["Relationship"]) + + # Objects: the Movie itself as ObjectURI + object_id = get_or_create(cursor, "Objects", "ObjectURI", row["MovieURI"], origin_id) + + # RDFs: only insert if the triplet doesn't exist + cursor.execute( + "SELECT RDF_ID FROM RDFs WHERE SubjectID=? AND RelationshipID=? AND ObjectID=?", + (subject_id, relationship_id, object_id) + ) + if not cursor.fetchone(): + cursor.execute( + "INSERT INTO RDFs (MovieID, SubjectID, RelationshipID, ObjectID) VALUES (?, ?, ?, ?)", + (row["MovieID"], subject_id, relationship_id, object_id) + ) + total_rdfs += 1 + + conn.commit() + + print(f"Reverse RDFs loaded: {total_rdfs} rows inserted.") + conn.close() + + +# --- Append the calls --- +# load_movies() +# load_origins() +# load_wikipageids() +# load_abstracts() +# load_dataset() +load_reverse()