Added Splitter to divide tokens from text

This commit is contained in:
Christian Risi 2025-09-28 18:03:16 +02:00
parent b071145f6e
commit d179e01971
2 changed files with 171 additions and 0 deletions

View File

@ -0,0 +1,40 @@
import re
from typing import Generator
from ..Enums import TokenType
class NanoSocratesSplitter:
def __init__(
self,
special_token_regex: re.Pattern
) -> None:
self.__special_token_regex = special_token_regex
def split_text(self, corpus: str) -> Generator[tuple[str, TokenType]]:
bpe_start = 0
bpe_end = len(corpus)
for bound_start, bound_end in self.__find_boundaries(corpus):
bpe_end = bound_start
BPE_TOKEN_TEXT = corpus[bpe_start:bpe_end]
if BPE_TOKEN_TEXT != "":
yield (BPE_TOKEN_TEXT, TokenType.BPE)
bpe_start = bound_end
SPECIAL_TOKEN_TEXT = corpus[bound_start:bound_end]
if SPECIAL_TOKEN_TEXT != "":
yield (SPECIAL_TOKEN_TEXT, TokenType.SPECIAL)
def __find_boundaries(self, corpus: str) -> Generator[tuple[int, int]]:
for match in self.__special_token_regex.finditer(corpus):
start = match.start()
end = match.end()
yield (start, end)

View File

@ -0,0 +1,131 @@
from Project_Model.Libs.BPE.Enums import TokenType
import Project_Model.Libs.BPE as BPE
import re
PATTERN = "<(TOKEN|SOT|SEP|EOT)>"
SYMBOL_REGEX = re.compile(PATTERN)
class TestSplitter:
def test_split(self):
TEXT = "<SOT>Lorem <SEP>"
SPLITTER = BPE.NanoSocratesSplitter(SYMBOL_REGEX)
EXPECTED_CHUNKS = [
("<SOT>", TokenType.SPECIAL),
("Lorem ", TokenType.BPE),
("<SEP>", TokenType.SPECIAL),
]
CHUNKS = list(SPLITTER.split_text(TEXT))
assert len(CHUNKS) == len(EXPECTED_CHUNKS)
for chunk, expected_chunk in zip(EXPECTED_CHUNKS, CHUNKS):
print(f"TEST:\n\tCHUNK:\t\t{chunk}\n\tEXPECTED:\t\t{expected_chunk}")
RECEIVED_TOKEN_STRING, RECEIVED_TOKEN_TYPE = chunk
EXPECTED_TOKEN_STRING, EXPECTED_TOKEN_TYPE = expected_chunk
assert RECEIVED_TOKEN_STRING == EXPECTED_TOKEN_STRING
assert RECEIVED_TOKEN_TYPE == EXPECTED_TOKEN_TYPE
def test_split_trailing_text(self):
TEXT = "ipsu<SEP>m d<SEP>olor"
SPLITTER = BPE.NanoSocratesSplitter(SYMBOL_REGEX)
EXPECTED_CHUNKS = [
("ipsu", TokenType.BPE),
("<SEP>", TokenType.SPECIAL),
("m d", TokenType.BPE),
("<SEP>", TokenType.SPECIAL),
]
CHUNKS = list(SPLITTER.split_text(TEXT))
assert len(CHUNKS) == len(EXPECTED_CHUNKS)
for chunk, expected_chunk in zip(EXPECTED_CHUNKS, CHUNKS):
print(f"TEST:\n\tCHUNK:\t\t{chunk}\n\tEXPECTED:\t\t{expected_chunk}")
RECEIVED_TOKEN_STRING, RECEIVED_TOKEN_TYPE = chunk
EXPECTED_TOKEN_STRING, EXPECTED_TOKEN_TYPE = expected_chunk
assert RECEIVED_TOKEN_STRING == EXPECTED_TOKEN_STRING
assert RECEIVED_TOKEN_TYPE == EXPECTED_TOKEN_TYPE
def test_split_multi_token(self):
TEXT = "ipsu<SEP>m d<SEP><SEP><SEP>dsg<SEP>olor"
SPLITTER = BPE.NanoSocratesSplitter(SYMBOL_REGEX)
EXPECTED_CHUNKS = [
("ipsu", TokenType.BPE),
("<SEP>", TokenType.SPECIAL),
("m d", TokenType.BPE),
("<SEP>", TokenType.SPECIAL),
("<SEP>", TokenType.SPECIAL),
("<SEP>", TokenType.SPECIAL),
("dsg", TokenType.BPE),
("<SEP>", TokenType.SPECIAL),
]
CHUNKS = list(SPLITTER.split_text(TEXT))
assert len(CHUNKS) == len(EXPECTED_CHUNKS)
for chunk, expected_chunk in zip(EXPECTED_CHUNKS, CHUNKS):
print(f"TEST:\n\tCHUNK:\t\t{chunk}\n\tEXPECTED:\t\t{expected_chunk}")
RECEIVED_TOKEN_STRING, RECEIVED_TOKEN_TYPE = chunk
EXPECTED_TOKEN_STRING, EXPECTED_TOKEN_TYPE = expected_chunk
assert RECEIVED_TOKEN_STRING == EXPECTED_TOKEN_STRING
assert RECEIVED_TOKEN_TYPE == EXPECTED_TOKEN_TYPE
def test_split_malformed_1(self):
TEXT = "<SEP>lerisque"
SPLITTER = BPE.NanoSocratesSplitter(SYMBOL_REGEX)
EXPECTED_CHUNKS = [
("<SEP>", TokenType.SPECIAL),
]
CHUNKS = list(SPLITTER.split_text(TEXT))
assert len(CHUNKS) == len(EXPECTED_CHUNKS)
for chunk, expected_chunk in zip(EXPECTED_CHUNKS, CHUNKS):
print(f"TEST:\n\tCHUNK:\t\t{chunk}\n\tEXPECTED:\t\t{expected_chunk}")
RECEIVED_TOKEN_STRING, RECEIVED_TOKEN_TYPE = chunk
EXPECTED_TOKEN_STRING, EXPECTED_TOKEN_TYPE = expected_chunk
assert RECEIVED_TOKEN_STRING == EXPECTED_TOKEN_STRING
assert RECEIVED_TOKEN_TYPE == EXPECTED_TOKEN_TYPE
def test_split_malformed_2(self):
TEXT = "lerisque"
SPLITTER = BPE.NanoSocratesSplitter(SYMBOL_REGEX)
EXPECTED_CHUNKS = []
CHUNKS = list(SPLITTER.split_text(TEXT))
assert len(CHUNKS) == len(EXPECTED_CHUNKS)
for chunk, expected_chunk in zip(EXPECTED_CHUNKS, CHUNKS):
print(f"TEST:\n\tCHUNK:\t\t{chunk}\n\tEXPECTED:\t\t{expected_chunk}")
RECEIVED_TOKEN_STRING, RECEIVED_TOKEN_TYPE = chunk
EXPECTED_TOKEN_STRING, EXPECTED_TOKEN_TYPE = expected_chunk
assert RECEIVED_TOKEN_STRING == EXPECTED_TOKEN_STRING
assert RECEIVED_TOKEN_TYPE == EXPECTED_TOKEN_TYPE