####################################################### # This file stand as endpoint to interact with DB # ####################################################### # import sqlite3 import pandas as pd from sqlalchemy import create_engine from Scripts.Libs.CleaningPipeline.special_token import SpecialToken class SqlEndpoint(): def __init__(self, DB_PATH = "./Assets/Dataset/DatawareHouse/dataset.db", chunk_size_row = 500): # self.CONN = sqlite3.connect(DB_PATH) # DEPRECATED self.sql_engine = create_engine(f"sqlite:///{DB_PATH}") # /// 3 slash -> relative path # //// 4 slash -> absolute # self.conn = self.sql_engine.connect().execution_options(stream_results=True) # it seems that sqlite doenst support streamer cursor # PRAGMA exeutes better in writing not reading self.chunk_size_row = chunk_size_row pass def get_RDF(self) -> pd.DataFrame : QUERY = """ SELECT MovieID, SubjectURI, RelationshipURI, ObjectURI FROM RDFs INNER JOIN Subjects USING (SubjectID) INNER JOIN Relationships USING (RelationshipID) INNER JOIN Objects USING (ObjectID); """ return pd.read_sql_query(QUERY, self.CONN) def get_chunked_abbreviated_dataset(self) -> pd.DataFrame : """ Returns: pd.DataFrame: MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract """ QUERY = """ SELECT MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract FROM RDFs INNER JOIN ParsedSubjects USING (SubjectID) INNER JOIN ParsedRelationships USING (RelationshipID) INNER JOIN ParsedObjects USING (ObjectID) INNER JOIN WikipediaAbstracts USING (MovieID); """ # return pd.read_sql_query(QUERY, self.CONN, chunksize=500) # sqlite3 return pd.read_sql_query(QUERY, self.sql_engine, chunksize=self.chunk_size_row) def get_chunked_abbreviated_dataset_with_start_token(self)-> pd.DataFrame: # DEPRECATED ! start_token = SpecialToken() QUERY = """ SELECT MovieID, ? || SubjectURI AS SubjectURI, ? || RelationshipURI AS RelationshipURI, ? || ObjectURI AS ObjectURI, Abstract FROM RDFs INNER JOIN ParsedSubjects USING (SubjectID) INNER JOIN ParsedRelationships USING (RelationshipID) INNER JOIN ParsedObjects USING (ObjectID) INNER JOIN WikipediaAbstracts USING (MovieID); """ return pd.read_sql_query(QUERY, self.sql_engine, chunksize=self.chunk_size_row) def get_abbreviated_dataset_by_movie_id(self):# -> iter[pd.DataFrame]: """ Gets each time a DataFrame per movie ( with all its rows in the dataset). The retrieved RDFs are already abbrevieted by the sql parser Yields: Pandas.DataFrame: [MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract] """ # chunk by movieId, abstract is the same and some intersting logic are appliable movie_ids = pd.read_sql_query("SELECT MovieID FROM Movies;", self.sql_engine)["MovieID"] # CHOOSEN MOVIE: # The Dark Knight : 117248 # Inception : 147074 # The Avengers : 113621 # Cast Away : 1123 # The Departed : 117586 # American Psycho : 90177 # Avatar : 71587 # Django Unchained : 138952 # Spirited Away : 144137 # Knives Out : 148025 movie_list = [117248, 147074, 113621, 1123, 117586, 90177, 71587, 138952, 144137, 148025] movie_ids = movie_list QUERY = """ SELECT MovieID, SubjectURI, RelationshipURI, ObjectURI, Abstract FROM RDFs INNER JOIN ParsedSubjects USING (SubjectID) INNER JOIN ParsedRelationships USING (RelationshipID) INNER JOIN ParsedObjects USING (ObjectID) INNER JOIN WikipediaAbstracts USING (MovieID) WHERE MovieID = (?); """ for movie_id in movie_ids: yield pd.read_sql_query(QUERY, self.sql_engine, params=(movie_id,)) def get_movies_id_count(self) -> pd.DataFrame: """ Gets the count of each Movie in the Dataset Returns: Pandas.DataFrame: [MovieID, Count] """ QUERY = """ SELECT MovieID, COUNT(*) AS Count FROM RDFs GROUP BY MovieID; """ return pd.read_sql_query(QUERY, self.sql_engine) def get_relationship_count(self) -> pd.DataFrame: """ Gets the count of each Relationship in the Dataset Returns: Pandas.DataFrame: [RelationshipURI, Count] """ QUERY = """ SELECT RelationshipURI, COUNT(*) AS Count FROM RDFs INNER JOIN ParsedRelationships USING (RelationshipID) GROUP BY RelationshipURI; """ return pd.read_sql_query(QUERY, self.sql_engine) if __name__ == "__main__" : sql_endpoint = SqlEndpoint() for pandas_row in sql_endpoint.get_abbreviated_dataset_by_movie_id(): print(pandas_row) # sql_endpoint.get_RDF() print("done")