diff --git a/Project_Model/Libs/BPE/Classes/NanoSocratesSplitter.py b/Project_Model/Libs/BPE/Classes/NanoSocratesSplitter.py new file mode 100644 index 0000000..ccca300 --- /dev/null +++ b/Project_Model/Libs/BPE/Classes/NanoSocratesSplitter.py @@ -0,0 +1,40 @@ +import re +from typing import Generator +from ..Enums import TokenType + + +class NanoSocratesSplitter: + + def __init__( + self, + special_token_regex: re.Pattern + ) -> None: + self.__special_token_regex = special_token_regex + + def split_text(self, corpus: str) -> Generator[tuple[str, TokenType]]: + + bpe_start = 0 + bpe_end = len(corpus) + + for bound_start, bound_end in self.__find_boundaries(corpus): + + bpe_end = bound_start + BPE_TOKEN_TEXT = corpus[bpe_start:bpe_end] + + if BPE_TOKEN_TEXT != "": + yield (BPE_TOKEN_TEXT, TokenType.BPE) + + bpe_start = bound_end + SPECIAL_TOKEN_TEXT = corpus[bound_start:bound_end] + + if SPECIAL_TOKEN_TEXT != "": + yield (SPECIAL_TOKEN_TEXT, TokenType.SPECIAL) + + def __find_boundaries(self, corpus: str) -> Generator[tuple[int, int]]: + + for match in self.__special_token_regex.finditer(corpus): + start = match.start() + end = match.end() + + yield (start, end) + diff --git a/Project_Model/Tests/splitter_test.py b/Project_Model/Tests/splitter_test.py new file mode 100644 index 0000000..eda95b6 --- /dev/null +++ b/Project_Model/Tests/splitter_test.py @@ -0,0 +1,131 @@ +from Project_Model.Libs.BPE.Enums import TokenType +import Project_Model.Libs.BPE as BPE + +import re + + +PATTERN = "<(TOKEN|SOT|SEP|EOT)>" +SYMBOL_REGEX = re.compile(PATTERN) + + +class TestSplitter: + + def test_split(self): + + TEXT = "Lorem " + + SPLITTER = BPE.NanoSocratesSplitter(SYMBOL_REGEX) + + EXPECTED_CHUNKS = [ + ("", TokenType.SPECIAL), + ("Lorem ", TokenType.BPE), + ("", TokenType.SPECIAL), + ] + + CHUNKS = list(SPLITTER.split_text(TEXT)) + + assert len(CHUNKS) == len(EXPECTED_CHUNKS) + + for chunk, expected_chunk in zip(EXPECTED_CHUNKS, CHUNKS): + print(f"TEST:\n\tCHUNK:\t\t{chunk}\n\tEXPECTED:\t\t{expected_chunk}") + RECEIVED_TOKEN_STRING, RECEIVED_TOKEN_TYPE = chunk + EXPECTED_TOKEN_STRING, EXPECTED_TOKEN_TYPE = expected_chunk + + assert RECEIVED_TOKEN_STRING == EXPECTED_TOKEN_STRING + assert RECEIVED_TOKEN_TYPE == EXPECTED_TOKEN_TYPE + + def test_split_trailing_text(self): + + TEXT = "ipsum dolor" + + SPLITTER = BPE.NanoSocratesSplitter(SYMBOL_REGEX) + + EXPECTED_CHUNKS = [ + ("ipsu", TokenType.BPE), + ("", TokenType.SPECIAL), + ("m d", TokenType.BPE), + ("", TokenType.SPECIAL), + ] + + CHUNKS = list(SPLITTER.split_text(TEXT)) + + assert len(CHUNKS) == len(EXPECTED_CHUNKS) + + for chunk, expected_chunk in zip(EXPECTED_CHUNKS, CHUNKS): + print(f"TEST:\n\tCHUNK:\t\t{chunk}\n\tEXPECTED:\t\t{expected_chunk}") + RECEIVED_TOKEN_STRING, RECEIVED_TOKEN_TYPE = chunk + EXPECTED_TOKEN_STRING, EXPECTED_TOKEN_TYPE = expected_chunk + + assert RECEIVED_TOKEN_STRING == EXPECTED_TOKEN_STRING + assert RECEIVED_TOKEN_TYPE == EXPECTED_TOKEN_TYPE + + def test_split_multi_token(self): + + TEXT = "ipsum ddsgolor" + + SPLITTER = BPE.NanoSocratesSplitter(SYMBOL_REGEX) + + EXPECTED_CHUNKS = [ + ("ipsu", TokenType.BPE), + ("", TokenType.SPECIAL), + ("m d", TokenType.BPE), + ("", TokenType.SPECIAL), + ("", TokenType.SPECIAL), + ("", TokenType.SPECIAL), + ("dsg", TokenType.BPE), + ("", TokenType.SPECIAL), + ] + + CHUNKS = list(SPLITTER.split_text(TEXT)) + + assert len(CHUNKS) == len(EXPECTED_CHUNKS) + + for chunk, expected_chunk in zip(EXPECTED_CHUNKS, CHUNKS): + print(f"TEST:\n\tCHUNK:\t\t{chunk}\n\tEXPECTED:\t\t{expected_chunk}") + RECEIVED_TOKEN_STRING, RECEIVED_TOKEN_TYPE = chunk + EXPECTED_TOKEN_STRING, EXPECTED_TOKEN_TYPE = expected_chunk + + assert RECEIVED_TOKEN_STRING == EXPECTED_TOKEN_STRING + assert RECEIVED_TOKEN_TYPE == EXPECTED_TOKEN_TYPE + + def test_split_malformed_1(self): + + TEXT = "lerisque" + + SPLITTER = BPE.NanoSocratesSplitter(SYMBOL_REGEX) + + EXPECTED_CHUNKS = [ + ("", TokenType.SPECIAL), + ] + + CHUNKS = list(SPLITTER.split_text(TEXT)) + + assert len(CHUNKS) == len(EXPECTED_CHUNKS) + + for chunk, expected_chunk in zip(EXPECTED_CHUNKS, CHUNKS): + print(f"TEST:\n\tCHUNK:\t\t{chunk}\n\tEXPECTED:\t\t{expected_chunk}") + RECEIVED_TOKEN_STRING, RECEIVED_TOKEN_TYPE = chunk + EXPECTED_TOKEN_STRING, EXPECTED_TOKEN_TYPE = expected_chunk + + assert RECEIVED_TOKEN_STRING == EXPECTED_TOKEN_STRING + assert RECEIVED_TOKEN_TYPE == EXPECTED_TOKEN_TYPE + + def test_split_malformed_2(self): + + TEXT = "lerisque" + + SPLITTER = BPE.NanoSocratesSplitter(SYMBOL_REGEX) + + EXPECTED_CHUNKS = [] + + CHUNKS = list(SPLITTER.split_text(TEXT)) + + assert len(CHUNKS) == len(EXPECTED_CHUNKS) + + for chunk, expected_chunk in zip(EXPECTED_CHUNKS, CHUNKS): + print(f"TEST:\n\tCHUNK:\t\t{chunk}\n\tEXPECTED:\t\t{expected_chunk}") + RECEIVED_TOKEN_STRING, RECEIVED_TOKEN_TYPE = chunk + EXPECTED_TOKEN_STRING, EXPECTED_TOKEN_TYPE = expected_chunk + + assert RECEIVED_TOKEN_STRING == EXPECTED_TOKEN_STRING + assert RECEIVED_TOKEN_TYPE == EXPECTED_TOKEN_TYPE