201 lines
8.2 KiB
Python
201 lines
8.2 KiB
Python
import sqlite3
|
|
import pandas as pd
|
|
|
|
# --- Global configuration ---
|
|
DB_NAME = "./Assets/Dataset/DatawareHouse/dataset.db"
|
|
MOVIES_CSV = "./Assets/Dataset/1-hop/movies.csv"
|
|
PAGEID_CSV = "./Assets/Dataset/1-hop/movie-pageid.csv"
|
|
SUMMARY_CSV = "./Assets/Dataset/1-hop/wikipedia-summary.csv"
|
|
DATASET_CSV = "./Assets/Dataset/1-hop/dataset.csv"
|
|
REVERSE_CSV = "./Assets/Dataset/1-hop/reverse.csv"
|
|
CHUNK_SIZE = 50000 # adjust based on memory
|
|
|
|
# --- Load Movies ---
|
|
def load_movies():
|
|
conn = sqlite3.connect(DB_NAME)
|
|
total_inserted = 0
|
|
for chunk in pd.read_csv(MOVIES_CSV, chunksize=CHUNK_SIZE):
|
|
chunk.rename(columns={"subject": "MovieURI"}, inplace=True)
|
|
chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip()
|
|
chunk.to_sql("Movies", conn, if_exists="append", index=False)
|
|
total_inserted += len(chunk)
|
|
print(f"Movies loaded: {total_inserted} rows inserted.")
|
|
conn.close()
|
|
|
|
# --- Load Origins ---
|
|
def load_origins():
|
|
conn = sqlite3.connect(DB_NAME)
|
|
origins = pd.DataFrame({"OriginName": ["Dataset", "Reverse"]})
|
|
origins.to_sql("Origins", conn, if_exists="append", index=False)
|
|
conn.close()
|
|
print("Origins loaded.")
|
|
|
|
# --- Load WikiPageIDs ---
|
|
def load_wikipageids():
|
|
conn = sqlite3.connect(DB_NAME)
|
|
movies_df = pd.read_sql_query("SELECT MovieID, MovieURI FROM Movies", conn)
|
|
movies_df["MovieURI"] = movies_df["MovieURI"].astype(str).str.strip()
|
|
|
|
total_inserted = 0
|
|
for chunk in pd.read_csv(PAGEID_CSV, chunksize=CHUNK_SIZE):
|
|
chunk.rename(columns={"subject": "MovieURI", "object": "PageID"}, inplace=True)
|
|
chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip()
|
|
chunk["PageID"] = chunk["PageID"].astype(int)
|
|
merged = chunk.merge(movies_df, on="MovieURI", how="inner")
|
|
if not merged.empty:
|
|
merged[["MovieID", "PageID"]].to_sql("WikiPageIDs", conn, if_exists="append", index=False)
|
|
total_inserted += len(merged)
|
|
print(f"WikiPageIDs loaded: {total_inserted} rows inserted.")
|
|
conn.close()
|
|
|
|
# --- Load Wikipedia Abstracts ---
|
|
def load_abstracts():
|
|
conn = sqlite3.connect(DB_NAME)
|
|
# Get MovieID mapping from WikiPageIDs
|
|
pageid_df = pd.read_sql_query("SELECT MovieID, PageID FROM WikiPageIDs", conn)
|
|
pageid_df["PageID"] = pageid_df["PageID"].astype(int)
|
|
|
|
total_inserted = 0
|
|
for chunk in pd.read_csv(SUMMARY_CSV, chunksize=CHUNK_SIZE):
|
|
chunk.rename(columns={"subject": "PageID", "text": "Abstract"}, inplace=True)
|
|
chunk["PageID"] = chunk["PageID"].astype(int)
|
|
merged = chunk.merge(pageid_df, on="PageID", how="inner")
|
|
if not merged.empty:
|
|
merged[["MovieID", "Abstract"]].to_sql("WikipediaAbstracts", conn, if_exists="append", index=False)
|
|
total_inserted += len(merged)
|
|
if total_inserted == 0:
|
|
print("No abstracts inserted — table WikipediaAbstracts is empty.")
|
|
else:
|
|
print(f"WikipediaAbstracts loaded: {total_inserted} rows inserted.")
|
|
conn.close()
|
|
|
|
# --- Load Dataset RDFs ---
|
|
# --- Helper function to get or create an entry and return its ID ---
|
|
def get_or_create(cursor, table, column, value, origin_id=None):
|
|
# is idempotent!
|
|
# Check existence only on the value itself (because the column is UNIQUE)
|
|
cursor.execute(f"SELECT {table[:-1]}ID FROM {table} WHERE {column}=?", (value,))
|
|
result = cursor.fetchone()
|
|
if result:
|
|
return result[0] # IDEMPOTENT: if the object already exists, there isn't another insert
|
|
else:
|
|
if origin_id is not None:
|
|
cursor.execute(f"INSERT INTO {table} ({column}, OriginID) VALUES (?, ?)", (value, origin_id))
|
|
else:
|
|
cursor.execute(f"INSERT INTO {table} ({column}) VALUES (?)", (value,))
|
|
return cursor.lastrowid
|
|
|
|
# --- Load Dataset RDFs ---
|
|
def load_dataset():
|
|
conn = sqlite3.connect(DB_NAME)
|
|
cursor = conn.cursor()
|
|
|
|
# --- Load Movies mapping ---
|
|
movies_df = pd.read_sql_query("SELECT MovieID, MovieURI FROM Movies", conn)
|
|
movies_df["MovieURI"] = movies_df["MovieURI"].astype(str).str.strip()
|
|
|
|
# --- Get Dataset OriginID ---
|
|
origin_id = pd.read_sql_query("SELECT OriginID FROM Origins WHERE OriginName='Dataset'", conn)["OriginID"].iloc[0]
|
|
|
|
total_rdfs = 0
|
|
|
|
for chunk in pd.read_csv(DATASET_CSV, chunksize=CHUNK_SIZE):
|
|
chunk.rename(columns={"subject": "MovieURI", "relationship": "Relationship", "object": "ObjectURI"}, inplace=True)
|
|
chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip()
|
|
chunk["ObjectURI"] = chunk["ObjectURI"].astype(str).str.strip()
|
|
chunk["Relationship"] = chunk["Relationship"].astype(str).str.strip()
|
|
|
|
# --- Merge to get MovieID ---
|
|
merged = chunk.merge(movies_df, on="MovieURI", how="inner") # movideId / Subject (MovieUri) / Rel / Obj
|
|
if merged.empty:
|
|
continue
|
|
|
|
for _, row in merged.iterrows(): # HERE, EACH ROW IS ELABORATED ALONE
|
|
# Subjects: the Movie itself as SubjectURI <---- Remember Subject is renamed as MovieURI
|
|
subject_id = get_or_create(cursor, "Subjects", "SubjectURI", row["MovieURI"], origin_id)
|
|
|
|
# Relationships
|
|
relationship_id = get_or_create(cursor, "Relationships", "RelationshipURI", row["Relationship"])
|
|
|
|
# Objects
|
|
object_id = get_or_create(cursor, "Objects", "ObjectURI", row["ObjectURI"], origin_id)
|
|
|
|
# RDFs: only insert if the triplet doesn't exist
|
|
cursor.execute(
|
|
"SELECT RDF_ID FROM RDFs WHERE SubjectID=? AND RelationshipID=? AND ObjectID=?",
|
|
(subject_id, relationship_id, object_id)
|
|
)
|
|
if not cursor.fetchone():
|
|
cursor.execute(
|
|
"INSERT INTO RDFs (MovieID, SubjectID, RelationshipID, ObjectID) VALUES (?, ?, ?, ?)",
|
|
(row["MovieID"], subject_id, relationship_id, object_id)
|
|
)
|
|
total_rdfs += 1
|
|
|
|
conn.commit()
|
|
|
|
print(f"Dataset RDFs loaded: {total_rdfs} rows inserted.")
|
|
conn.close()
|
|
|
|
# --- Load Reverse RDFs ---
|
|
# --- Load Reverse RDFs ---
|
|
def load_reverse():
|
|
conn = sqlite3.connect(DB_NAME)
|
|
cursor = conn.cursor()
|
|
|
|
# --- Load Movies mapping ---
|
|
movies_df = pd.read_sql_query("SELECT MovieID, MovieURI FROM Movies", conn)
|
|
movies_df["MovieURI"] = movies_df["MovieURI"].astype(str).str.strip()
|
|
|
|
# --- Get Reverse OriginID ---
|
|
origin_id = pd.read_sql_query("SELECT OriginID FROM Origins WHERE OriginName='Reverse'", conn)["OriginID"].iloc[0]
|
|
|
|
total_rdfs = 0
|
|
|
|
for chunk in pd.read_csv(REVERSE_CSV, chunksize=CHUNK_SIZE):
|
|
chunk.rename(columns={"subject": "SubjectURI", "relationship": "Relationship", "object": "MovieURI"}, inplace=True)
|
|
chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip()
|
|
chunk["SubjectURI"] = chunk["SubjectURI"].astype(str).str.strip()
|
|
chunk["Relationship"] = chunk["Relationship"].astype(str).str.strip()
|
|
|
|
# --- Merge to get MovieID ---
|
|
merged = chunk.merge(movies_df, on="MovieURI", how="inner")
|
|
if merged.empty:
|
|
continue
|
|
|
|
for _, row in merged.iterrows():
|
|
# Subjects: from Reverse CSV
|
|
subject_id = get_or_create(cursor, "Subjects", "SubjectURI", row["SubjectURI"], origin_id)
|
|
|
|
# Relationships
|
|
relationship_id = get_or_create(cursor, "Relationships", "RelationshipURI", row["Relationship"])
|
|
|
|
# Objects: the Movie itself as ObjectURI
|
|
object_id = get_or_create(cursor, "Objects", "ObjectURI", row["MovieURI"], origin_id)
|
|
|
|
# RDFs: only insert if the triplet doesn't exist
|
|
cursor.execute(
|
|
"SELECT RDF_ID FROM RDFs WHERE SubjectID=? AND RelationshipID=? AND ObjectID=?",
|
|
(subject_id, relationship_id, object_id)
|
|
)
|
|
if not cursor.fetchone():
|
|
cursor.execute(
|
|
"INSERT INTO RDFs (MovieID, SubjectID, RelationshipID, ObjectID) VALUES (?, ?, ?, ?)",
|
|
(row["MovieID"], subject_id, relationship_id, object_id)
|
|
)
|
|
total_rdfs += 1
|
|
|
|
conn.commit()
|
|
|
|
print(f"Reverse RDFs loaded: {total_rdfs} rows inserted.")
|
|
conn.close()
|
|
|
|
|
|
# --- Append the calls ---
|
|
# load_movies()
|
|
# load_origins()
|
|
# load_wikipageids()
|
|
# load_abstracts()
|
|
# load_dataset()
|
|
load_reverse()
|