|
""" |
|
Advanced usage example for NexForge Tokenizer Builder. |
|
|
|
This example demonstrates: |
|
- Custom special tokens |
|
- Batch processing with progress tracking |
|
- Vocabulary inspection and analysis |
|
- Error handling and recovery |
|
- Performance optimization |
|
""" |
|
import os |
|
import json |
|
import time |
|
from pathlib import Path |
|
from typing import Dict, List, Optional |
|
|
|
from tqdm import tqdm |
|
|
|
|
|
from nexforgetokenizer import ( |
|
build_tokenizer, |
|
SystemResources, |
|
log_memory_usage, |
|
TokenizerError |
|
) |
|
|
|
def create_large_sample_dataset(num_files: int = 50, base_dir: str = "sample_data") -> Path: |
|
"""Create a larger sample dataset with different file types.""" |
|
base_path = Path(base_dir) |
|
|
|
|
|
if base_path.exists(): |
|
import shutil |
|
shutil.rmtree(base_path) |
|
|
|
|
|
base_path.mkdir(exist_ok=True) |
|
|
|
|
|
for i in range(num_files // 2): |
|
module_content = f""" |
|
# Sample Python module {i} |
|
|
|
def process_data(data): |
|
'''Process sample data.''' |
|
result = [] |
|
for item in data: |
|
if item % 2 == 0: |
|
result.append(item * 2) |
|
return result |
|
""" |
|
(base_path / f"module_{i}.py").write_text(module_content) |
|
|
|
|
|
for i in range(num_files // 2): |
|
doc_content = f""" |
|
This is sample text document {i}. |
|
It contains multiple lines of text with various tokens. |
|
The quick brown fox jumps over the lazy dog. |
|
Special characters: !@#$%^&*()_+-=[]{{}}|;':\",./<>? |
|
""" |
|
(base_path / f"document_{i}.txt").write_text(doc_content) |
|
|
|
print(f"Created {num_files} sample files in {base_path}") |
|
return base_path |
|
|
|
class DataProcessor: |
|
"""Example data processor class for demonstration.""" |
|
def __init__(self, config: dict): |
|
self.config = config |
|
|
|
def run(self): |
|
"""Run the processor with the current config.""" |
|
print(f"Processing with config: {self.config}") |
|
|
|
class TokenizerAnalyzer: |
|
"""Helper class for analyzing tokenizer performance and vocabulary.""" |
|
|
|
def __init__(self, tokenizer_path: str): |
|
self.tokenizer_path = tokenizer_path |
|
self.tokenizer = None |
|
self.vocab = None |
|
|
|
def load(self): |
|
"""Load the tokenizer.""" |
|
from tokenizers import Tokenizer |
|
self.tokenizer = Tokenizer.from_file(self.tokenizer_path) |
|
self.vocab = { |
|
idx: self.tokenizer.id_to_token(idx) |
|
for idx in range(self.tokenizer.get_vocab_size()) |
|
} |
|
|
|
def analyze_vocab(self, top_n: int = 20): |
|
"""Analyze and print vocabulary statistics.""" |
|
if not self.tokenizer: |
|
self.load() |
|
|
|
vocab_size = len(self.vocab) |
|
special_tokens = [ |
|
token for token in self.vocab.values() |
|
if token.startswith("[") and token.endswith("]") |
|
] |
|
|
|
print(f"\n=== Vocabulary Analysis ===") |
|
print(f"Total vocabulary size: {vocab_size}") |
|
print(f"Special tokens ({len(special_tokens)}): {', '.join(special_tokens[:10])}" + |
|
("..." if len(special_tokens) > 10 else "")) |
|
|
|
|
|
print(f"\nSample vocabulary items:") |
|
for idx in range(min(top_n, vocab_size)): |
|
print(f" {idx}: {self.vocab.get(idx, 'N/A')}") |
|
|
|
if vocab_size > top_n: |
|
print(f" ... and {vocab_size - top_n} more") |
|
|
|
def main(): |
|
"""Run the advanced example.""" |
|
print("NexForge Tokenizer Builder - Advanced Example") |
|
print("=========================================\n") |
|
|
|
|
|
output_dir = Path("advanced_output") |
|
output_dir.mkdir(exist_ok=True) |
|
|
|
tokenizer_path = output_dir / "advanced_tokenizer.json" |
|
|
|
|
|
resources = SystemResources() |
|
print(f"\n=== System Resources ===") |
|
print(f"CPU Cores: {resources.cpu_cores}") |
|
print(f"Available RAM: {resources.available_ram_gb:.2f} GB") |
|
if resources.has_cuda: |
|
print(f"GPU: {resources.cuda_device} with {resources.cuda_mem_gb:.2f} GB") |
|
else: |
|
print("No CUDA GPU detected") |
|
|
|
|
|
print("\n=== Creating Sample Dataset ===") |
|
dataset_path = create_large_sample_dataset(num_files=50) |
|
|
|
|
|
special_tokens = [ |
|
"[PAD]", "[UNK]", "[CLS]", "[SEP]", "[MASK]", |
|
"[PYTHON]", "[TEXT]", "[CODE]" |
|
] |
|
|
|
|
|
print("\n=== Building Tokenizer ===") |
|
print(f"Input directory: {dataset_path}") |
|
print(f"Output path: {tokenizer_path}") |
|
|
|
start_time = time.time() |
|
|
|
try: |
|
success = build_tokenizer( |
|
input_dir=str(dataset_path), |
|
output_path=str(tokenizer_path), |
|
vocab_size=5000, |
|
min_frequency=2, |
|
special_tokens=special_tokens, |
|
resources=resources, |
|
max_files=50, |
|
chunk_size=100000, |
|
n_threads=max(1, resources.cpu_cores - 1) |
|
) |
|
|
|
if success: |
|
duration = time.time() - start_time |
|
print(f"\nTokenizer created successfully in {duration:.2f} seconds") |
|
print(f"Tokenizer saved to: {tokenizer_path}") |
|
|
|
|
|
print("\n=== Tokenizer Analysis ===") |
|
analyzer = TokenizerAnalyzer(str(tokenizer_path)) |
|
analyzer.load() |
|
analyzer.analyze_vocab() |
|
|
|
|
|
print("\n=== Example Encoding/Decoding ===") |
|
sample_text = "def hello_world():\n print('Hello, world!') # Sample Python code" |
|
|
|
encoded = analyzer.tokenizer.encode(sample_text) |
|
decoded = analyzer.tokenizer.decode(encoded.ids) |
|
|
|
print(f"Original: {sample_text}") |
|
print(f"Encoded: {encoded.ids}") |
|
print(f"Tokens: {encoded.tokens}") |
|
print(f"Decoded: {decoded}") |
|
|
|
else: |
|
print("\nFailed to create tokenizer") |
|
|
|
except TokenizerError as e: |
|
print(f"\nError creating tokenizer: {e}") |
|
except Exception as e: |
|
print(f"\nUnexpected error: {e}") |
|
finally: |
|
|
|
|
|
|
|
pass |
|
|
|
print("\nExample completed!") |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|