DataRetrivial update, without df

This commit is contained in:
GassiGiuseppe 2025-09-20 23:32:08 +02:00
parent 8819b8e87f
commit c5439533e6

View File

@ -1,200 +1,249 @@
import sqlite3
import pandas as pd
import csv
# --- Global configuration ---
DB_NAME = "./Assets/Dataset/DatawareHouse/dataset.db"
DB_NAME = "./Assets/Dataset/Tmp/dataset.db"
MOVIES_CSV = "./Assets/Dataset/1-hop/movies.csv"
PAGEID_CSV = "./Assets/Dataset/1-hop/movie-pageid.csv"
SUMMARY_CSV = "./Assets/Dataset/1-hop/wikipedia-summary.csv"
DATASET_CSV = "./Assets/Dataset/1-hop/dataset.csv"
REVERSE_CSV = "./Assets/Dataset/1-hop/reverse.csv"
CHUNK_SIZE = 50000 # adjust based on memory
# --- Helper: idempotent insert-or-select ---
def get_or_create(cursor, table, column, value, origin_id=None):
# tries to put new values in db, then get the id (regardless of the check)
# Subjects and Objects need origin_id. Relationships do not
# try/except on INSERT keeps IDs contiguous (no AUTOINCREMENT jumps)
try:
if origin_id is not None:
cursor.execute(
f"INSERT INTO {table} ({column}, OriginID) VALUES (?, ?)",
(value, int(origin_id)),
)
else:
cursor.execute(
f"INSERT INTO {table} ({column}) VALUES (?)",
(value,),
)
except sqlite3.IntegrityError:
# Row already exists, do nothing
pass
# Always fetch the ID (whether new or existing)
# {table[:-1]}ID ->
# Subjects -> SubjectID
# Objects -> ObjectID
# Relationships -> RelationshipID
# kinda hardcoded
cursor.execute(f"SELECT {table[:-1]}ID FROM {table} WHERE {column}=?", (value,))
return cursor.fetchone()[0] # fetchone returns a list with one element
# --- Load Movies ---
def load_movies():
# Creates Movies: MovieID [PK] / Movie URI
# MovieID is managed by sql
conn = sqlite3.connect(DB_NAME)
total_inserted = 0
for chunk in pd.read_csv(MOVIES_CSV, chunksize=CHUNK_SIZE):
chunk.rename(columns={"subject": "MovieURI"}, inplace=True)
chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip()
chunk.to_sql("Movies", conn, if_exists="append", index=False)
total_inserted += len(chunk)
print(f"Movies loaded: {total_inserted} rows inserted.")
cur = conn.cursor()
total = 0
# MOVIES_CSV: "subject" [it has only this column]
with open(MOVIES_CSV, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
movie_uri = row["subject"].strip()
# try/except on INSERT keeps IDs contiguous (no AUTOINCREMENT jumps)
try:
cur.execute("INSERT INTO Movies (MovieURI) VALUES (?)", (movie_uri,))
total += 1 # count only if a new row was added
except sqlite3.IntegrityError:
# already exists, skip
pass
conn.commit() # suggested by dr
conn.close()
print(f"Movies loaded: {total}")
# --- Load Origins ---
def load_origins():
# Creates Origins: OriginID [PK]/ Origin Name
# ["Dataset", "Reverse"]
conn = sqlite3.connect(DB_NAME)
origins = pd.DataFrame({"OriginName": ["Dataset", "Reverse"]})
origins.to_sql("Origins", conn, if_exists="append", index=False)
cur = conn.cursor()
for origin in ["Dataset", "Reverse"]:
try:
cur.execute("INSERT INTO Origins (OriginName) VALUES (?)", (origin,))
except sqlite3.IntegrityError:
pass
conn.commit()
conn.close()
print("Origins loaded.")
# --- Load WikiPageIDs ---
def load_wikipageids():
# Creates WikiPageIDs: MovieID [PK, FK]/ PageId [IDX] (wiki)
conn = sqlite3.connect(DB_NAME)
movies_df = pd.read_sql_query("SELECT MovieID, MovieURI FROM Movies", conn)
movies_df["MovieURI"] = movies_df["MovieURI"].astype(str).str.strip()
total_inserted = 0
for chunk in pd.read_csv(PAGEID_CSV, chunksize=CHUNK_SIZE):
chunk.rename(columns={"subject": "MovieURI", "object": "PageID"}, inplace=True)
chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip()
chunk["PageID"] = chunk["PageID"].astype(int)
merged = chunk.merge(movies_df, on="MovieURI", how="inner")
if not merged.empty:
merged[["MovieID", "PageID"]].to_sql("WikiPageIDs", conn, if_exists="append", index=False)
total_inserted += len(merged)
print(f"WikiPageIDs loaded: {total_inserted} rows inserted.")
cur = conn.cursor()
total = 0
# PAGEID_CSV: "subject","object" -> MovieURI, WikiPageId
with open(PAGEID_CSV, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
movie_uri = row["subject"].strip()
page_id = int(row["object"])
cur.execute("SELECT MovieID FROM Movies WHERE MovieURI=?", (movie_uri,))
movie = cur.fetchone()
if movie:
try:
# it can become INSERT OR IGNORE instead of try catch
cur.execute(
"INSERT INTO WikiPageIDs (MovieID, PageID) VALUES (?, ?)",
(movie[0], page_id),
)
total += 1
except sqlite3.IntegrityError:
pass
conn.commit()
conn.close()
print(f"WikiPageIDs loaded: {total}")
# --- Load Wikipedia Abstracts ---
def load_abstracts():
# Cretes WikipediaAbstracts: MovieID [PK, FK]/ abstract
conn = sqlite3.connect(DB_NAME)
# Get MovieID mapping from WikiPageIDs
pageid_df = pd.read_sql_query("SELECT MovieID, PageID FROM WikiPageIDs", conn)
pageid_df["PageID"] = pageid_df["PageID"].astype(int)
total_inserted = 0
for chunk in pd.read_csv(SUMMARY_CSV, chunksize=CHUNK_SIZE):
chunk.rename(columns={"subject": "PageID", "text": "Abstract"}, inplace=True)
chunk["PageID"] = chunk["PageID"].astype(int)
merged = chunk.merge(pageid_df, on="PageID", how="inner")
if not merged.empty:
merged[["MovieID", "Abstract"]].to_sql("WikipediaAbstracts", conn, if_exists="append", index=False)
total_inserted += len(merged)
if total_inserted == 0:
print("No abstracts inserted — table WikipediaAbstracts is empty.")
else:
print(f"WikipediaAbstracts loaded: {total_inserted} rows inserted.")
cur = conn.cursor()
total = 0
# SUMMARY_CSV: subject,text -> WikiPageID / abstract
with open(SUMMARY_CSV, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
page_id = int(row["subject"])
abstract = row["text"].strip()
# WikiPageIDs: MovieID [PK, FK]/ PageId [IDX] (wiki)
cur.execute("SELECT MovieID FROM WikiPageIDs WHERE PageID=?", (page_id,))
movie = cur.fetchone() # which is MovieID
if movie:
try:
# it can become INSERT OR IGNORE instead of try catch
cur.execute(
"INSERT INTO WikipediaAbstracts (MovieID, Abstract) VALUES (?, ?)",
(movie[0], abstract),
)
total += 1
except sqlite3.IntegrityError as e:
print(e)
pass
conn.commit()
conn.close()
print(f"WikipediaAbstracts loaded: {total}")
# --- Load Dataset RDFs ---
# --- Helper function to get or create an entry and return its ID ---
def get_or_create(cursor, table, column, value, origin_id=None):
# is idempotent!
# Check existence only on the value itself (because the column is UNIQUE)
cursor.execute(f"SELECT {table[:-1]}ID FROM {table} WHERE {column}=?", (value,))
result = cursor.fetchone()
if result:
return result[0] # IDEMPOTENT: if the object already exists, there isn't another insert
else:
if origin_id is not None:
cursor.execute(f"INSERT INTO {table} ({column}, OriginID) VALUES (?, ?)", (value, origin_id))
else:
cursor.execute(f"INSERT INTO {table} ({column}) VALUES (?)", (value,))
return cursor.lastrowid
# --- Load Dataset RDFs ---
def load_dataset():
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cur = conn.cursor()
# get oridin_id from datset
cur.execute("SELECT OriginID FROM Origins WHERE OriginName='Dataset'")
origin_id = int(cur.fetchone()[0])
print(f"Origin_id is: {origin_id}")
####
total = 0
skipped_movie = 0
# DATASET_CSV: "subject","relationshi","object" -> MovieUri, RelationshipUri, ObjectUri
with open(DATASET_CSV, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
movie_uri = row["subject"].strip()
relationship_uri = row["relationship"].strip()
object_uri = row["object"].strip()
# --- Load Movies mapping ---
movies_df = pd.read_sql_query("SELECT MovieID, MovieURI FROM Movies", conn)
movies_df["MovieURI"] = movies_df["MovieURI"].astype(str).str.strip()
# --- Get Dataset OriginID ---
origin_id = pd.read_sql_query("SELECT OriginID FROM Origins WHERE OriginName='Dataset'", conn)["OriginID"].iloc[0]
total_rdfs = 0
for chunk in pd.read_csv(DATASET_CSV, chunksize=CHUNK_SIZE):
chunk.rename(columns={"subject": "MovieURI", "relationship": "Relationship", "object": "ObjectURI"}, inplace=True)
chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip()
chunk["ObjectURI"] = chunk["ObjectURI"].astype(str).str.strip()
chunk["Relationship"] = chunk["Relationship"].astype(str).str.strip()
# --- Merge to get MovieID ---
merged = chunk.merge(movies_df, on="MovieURI", how="inner") # movideId / Subject (MovieUri) / Rel / Obj
if merged.empty:
cur.execute("SELECT MovieID FROM Movies WHERE MovieURI=?", (movie_uri,))
movie = cur.fetchone()
if not movie:
skipped_movie += skipped_movie
continue
# it is a guard check if it doest exist a MovieID from the given MovieURI, then skip
for _, row in merged.iterrows(): # HERE, EACH ROW IS ELABORATED ALONE
# Subjects: the Movie itself as SubjectURI <---- Remember Subject is renamed as MovieURI
subject_id = get_or_create(cursor, "Subjects", "SubjectURI", row["MovieURI"], origin_id)
# now put each URI into their SCHEMA and retrieves IDs instead
subject_id = get_or_create(cur, "Subjects", "SubjectURI", movie_uri, origin_id)
relationship_id = get_or_create(cur, "Relationships", "RelationshipURI", relationship_uri)
object_id = get_or_create(cur, "Objects", "ObjectURI", object_uri, origin_id)
# Relationships
relationship_id = get_or_create(cursor, "Relationships", "RelationshipURI", row["Relationship"])
# Objects
object_id = get_or_create(cursor, "Objects", "ObjectURI", row["ObjectURI"], origin_id)
# RDFs: only insert if the triplet doesn't exist
cursor.execute(
# check if the triple is already in the RDF
cur.execute(
"SELECT RDF_ID FROM RDFs WHERE SubjectID=? AND RelationshipID=? AND ObjectID=?",
(subject_id, relationship_id, object_id)
(subject_id, relationship_id, object_id),
)
if not cursor.fetchone():
cursor.execute(
if not cur.fetchone():
cur.execute(
"INSERT INTO RDFs (MovieID, SubjectID, RelationshipID, ObjectID) VALUES (?, ?, ?, ?)",
(row["MovieID"], subject_id, relationship_id, object_id)
(movie[0], subject_id, relationship_id, object_id),
)
total_rdfs += 1
total += 1
conn.commit()
print(f"Dataset RDFs loaded: {total_rdfs} rows inserted.")
conn.close()
print(f"Dataset RDFs loaded: {total}")
print(f"Skipped Movies: {skipped_movie}")
# --- Load Reverse RDFs ---
# --- Load Reverse RDFs ---
def load_reverse():
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cur = conn.cursor()
# get oridin_id from datset
cur.execute("SELECT OriginID FROM Origins WHERE OriginName='Reverse'")
origin_id = int(cur.fetchone()[0])
print(f"Origin_id is: {origin_id}")
###
total = 0
skipped_movie = 0
# REVERSE_CSV : "subject","relationship","object" -> SubjectURI, RelationshipURI, MovieURI
with open(REVERSE_CSV, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
subject_uri = row["subject"].strip()
relationship_uri = row["relationship"].strip()
movie_uri = row["object"].strip()
# --- Load Movies mapping ---
movies_df = pd.read_sql_query("SELECT MovieID, MovieURI FROM Movies", conn)
movies_df["MovieURI"] = movies_df["MovieURI"].astype(str).str.strip()
# --- Get Reverse OriginID ---
origin_id = pd.read_sql_query("SELECT OriginID FROM Origins WHERE OriginName='Reverse'", conn)["OriginID"].iloc[0]
total_rdfs = 0
for chunk in pd.read_csv(REVERSE_CSV, chunksize=CHUNK_SIZE):
chunk.rename(columns={"subject": "SubjectURI", "relationship": "Relationship", "object": "MovieURI"}, inplace=True)
chunk["MovieURI"] = chunk["MovieURI"].astype(str).str.strip()
chunk["SubjectURI"] = chunk["SubjectURI"].astype(str).str.strip()
chunk["Relationship"] = chunk["Relationship"].astype(str).str.strip()
# --- Merge to get MovieID ---
merged = chunk.merge(movies_df, on="MovieURI", how="inner")
if merged.empty:
cur.execute("SELECT MovieID FROM Movies WHERE MovieURI=?", (movie_uri,))
movie = cur.fetchone()
if not movie:
skipped_movie += skipped_movie
continue
# it is a guard check if it doest exist a MovieID from the given MovieURI, then skip
for _, row in merged.iterrows():
# Subjects: from Reverse CSV
subject_id = get_or_create(cursor, "Subjects", "SubjectURI", row["SubjectURI"], origin_id)
# now put each URI into their SCHEMA and retrieves IDs instead
subject_id = get_or_create(cur, "Subjects", "SubjectURI", subject_uri, origin_id)
relationship_id = get_or_create(cur, "Relationships", "RelationshipURI", relationship_uri)
object_id = get_or_create(cur, "Objects", "ObjectURI", movie_uri, origin_id)
# Relationships
relationship_id = get_or_create(cursor, "Relationships", "RelationshipURI", row["Relationship"])
# Objects: the Movie itself as ObjectURI
object_id = get_or_create(cursor, "Objects", "ObjectURI", row["MovieURI"], origin_id)
# RDFs: only insert if the triplet doesn't exist
cursor.execute(
# check if the triple is already in the RDF
cur.execute(
"SELECT RDF_ID FROM RDFs WHERE SubjectID=? AND RelationshipID=? AND ObjectID=?",
(subject_id, relationship_id, object_id)
(subject_id, relationship_id, object_id),
)
if not cursor.fetchone():
cursor.execute(
if not cur.fetchone():
cur.execute(
"INSERT INTO RDFs (MovieID, SubjectID, RelationshipID, ObjectID) VALUES (?, ?, ?, ?)",
(row["MovieID"], subject_id, relationship_id, object_id)
(movie[0], subject_id, relationship_id, object_id),
)
total_rdfs += 1
total += 1
conn.commit()
print(f"Reverse RDFs loaded: {total_rdfs} rows inserted.")
conn.close()
print(f"Reverse RDFs loaded: {total}")
print(f"Skipped Movies: {skipped_movie}")
# --- Append the calls ---
# load_movies()
# load_origins()
# load_wikipageids()
# load_abstracts()
# load_dataset()
# --- Execution order ---
load_movies()
load_origins()
load_wikipageids()
load_abstracts()
load_dataset()
load_reverse()
# sqlite3 ./Assets/Dataset/Tmp/dataset.db < ./Scripts/DataCleaning/SQL_Queries/db_creation.sql