Update NanoSocratesBPE: corrected a minor bug about dictionary lenght,

added some comment to make the code more clear
This commit is contained in:
GassiGiuseppe 2025-10-03 00:57:19 +02:00
parent a1d143187d
commit 7c935d2700

View File

@ -2,7 +2,10 @@ from collections import deque
from .Encoder import Encoder from .Encoder import Encoder
from ..Errors import OutOfDictionaryException, DuplicateWordException from ..Errors import OutOfDictionaryException, DuplicateWordException
# ABOUT THE DICTIONARY:
# the string is converted into utf-char bytes, that is: each char is rappresented with a set of bytes from 1 to 4.
# each bytes get casted into an integer; such that, if an integer has its value lower then 256,
# then it is rappresenting an utf-char-byte, otherwise it is a token-ID.
class NanoSocratesBatchMemoryBPE: class NanoSocratesBatchMemoryBPE:
""" Memory to batch training. Keeps token couple frequencies, and merge_treshold """ Memory to batch training. Keeps token couple frequencies, and merge_treshold
""" """
@ -31,6 +34,7 @@ class NanoSocratesBPE(Encoder):
for key, value in vocabulary.items(): for key, value in vocabulary.items():
if value < 256: if value < 256:
raise OutOfDictionaryException() raise OutOfDictionaryException()
# values under 256 are used for unpaired char
# TODO: check if they are in order # TODO: check if they are in order
self.__vocabulary[key] = value self.__vocabulary[key] = value
self.__reverse_vocabulary[value] = key self.__reverse_vocabulary[value] = key
@ -38,7 +42,7 @@ class NanoSocratesBPE(Encoder):
@property @property
def vocabulary_size(self): def vocabulary_size(self):
return len(self.__vocabulary) + 255 return len(self.__vocabulary) + 256
@property @property
def vocabulary(self): def vocabulary(self):
@ -51,7 +55,7 @@ class NanoSocratesBPE(Encoder):
Returns: Returns:
int: int:
""" """
return self.vocabulary_size + 1 return self.vocabulary_size
# TODO: implement fit # TODO: implement fit
def fit( def fit(
@ -64,6 +68,7 @@ class NanoSocratesBPE(Encoder):
ENCODED_CHUNK = self.encode_intermediate(chunk_data) ENCODED_CHUNK = self.encode_intermediate(chunk_data)
DATA_LEN_BEFORE_LAST = len(ENCODED_CHUNK) - 1 DATA_LEN_BEFORE_LAST = len(ENCODED_CHUNK) - 1
# update frequency of each couple of element
for i in range(0, DATA_LEN_BEFORE_LAST): for i in range(0, DATA_LEN_BEFORE_LAST):
CANDIDATE_COUPLE = (ENCODED_CHUNK[i], ENCODED_CHUNK[i+1]) CANDIDATE_COUPLE = (ENCODED_CHUNK[i], ENCODED_CHUNK[i+1])
@ -77,6 +82,7 @@ class NanoSocratesBPE(Encoder):
frequency += 1 frequency += 1
memory.frequencies[CANDIDATE_COUPLE] = frequency memory.frequencies[CANDIDATE_COUPLE] = frequency
if not last_batch: if not last_batch:
return (self, memory, ENCODED_CHUNK) return (self, memory, ENCODED_CHUNK)
@ -126,13 +132,14 @@ class NanoSocratesBPE(Encoder):
def __round_encode(self, piece: list[int]): def __round_encode(self, piece: list[int]):
"""_summary_ """ A single round of encode that traverse all the object. Multiple round are needed for a full encode: \n
1) "ABAB" -> "XX"
2) "XX" -> "Y"
Args: Args:
piece (list[int]): _description_ piece (list[int]): the object to encode as a list of integer
Returns: Returns:
_type_: _description_ (list[int]): the one time encoded object
""" """
if len(piece) == 1: if len(piece) == 1:
@ -144,27 +151,32 @@ class NanoSocratesBPE(Encoder):
index = 0 index = 0
while index < PIECE_LENGTH: while index < PIECE_LENGTH:
CANDIDATE_WORD = (piece[index], piece[index + 1]) CANDIDATE_WORD = (piece[index], piece[index + 1]) # take a tuple of consecutive element [int]
CANDIDATE_TOKEN = self.__vocabulary.get(CANDIDATE_WORD) CANDIDATE_TOKEN = self.__vocabulary.get(CANDIDATE_WORD)
# if no token to substitute the tuple, append the first element
if CANDIDATE_TOKEN is None: if CANDIDATE_TOKEN is None:
NEW_PIECE.append(piece[index]) NEW_PIECE.append(piece[index])
index += 1 index += 1
# if the latter element of the tuple is the last element of the piece, append it
if index == PIECE_LENGTH: if index == PIECE_LENGTH:
NEW_PIECE.append(piece[index]) NEW_PIECE.append(piece[index])
continue continue
# in this case there was a candidate token to substitute the couple of element
NEW_PIECE.append(CANDIDATE_TOKEN) NEW_PIECE.append(CANDIDATE_TOKEN)
index += 2 index += 2
return NEW_PIECE return NEW_PIECE
# TODO: Remake decode to take a list of token IDs # TODO: Remake decode to take a list of token IDs
def decode(self, token_ids: list[int]) -> str: def decode(self, token_ids: list[int]) -> str:
# deque: double ended queue # deque: double ended queue
token_stack: deque[int] = deque(token_ids) token_stack: deque[int] = deque(token_ids)
UTF_8_STRING_ARR: bytearray = bytearray() UTF_8_STRING_ARR: bytearray = bytearray()
@ -199,7 +211,13 @@ class NanoSocratesBPE(Encoder):
return CANDIDATE_DECODED return CANDIDATE_DECODED
def __learn_word(self, words: tuple[int, int]): def __learn_word(self, words: tuple[int, int]):
""" learn a new couple of object in the vocabulary
Args:
words (tuple[int, int]): the Pair of element to substitute with a new tokenID
Raises:
DuplicateWordException: it launch if there is a duplicate of the new tokenID in the dictionary
"""
ID = self.__next_id ID = self.__next_id
DUPLICATE = self.__vocabulary.get(words) DUPLICATE = self.__vocabulary.get(words)