File size: 6,706 Bytes
4265aea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""
Advanced usage example for NexForge Tokenizer Builder.

This example demonstrates:
- Custom special tokens
- Batch processing with progress tracking
- Vocabulary inspection and analysis
- Error handling and recovery
- Performance optimization
"""
import os
import json
import time
from pathlib import Path
from typing import Dict, List, Optional

from tqdm import tqdm

# Import the tokenizer components
from nexforgetokenizer import (
    build_tokenizer,
    SystemResources,
    log_memory_usage,
    TokenizerError
)

def create_large_sample_dataset(num_files: int = 50, base_dir: str = "sample_data") -> Path:
    """Create a larger sample dataset with different file types."""
    base_path = Path(base_dir)
    
    # Clean up if exists
    if base_path.exists():
        import shutil
        shutil.rmtree(base_path)
    
    # Create directories
    base_path.mkdir(exist_ok=True)
    
    # Create Python files
    for i in range(num_files // 2):
        module_content = f"""
# Sample Python module {i}

def process_data(data):
    '''Process sample data.'''
    result = []
    for item in data:
        if item % 2 == 0:
            result.append(item * 2)
    return result
"""
        (base_path / f"module_{i}.py").write_text(module_content)
    
    # Create text files
    for i in range(num_files // 2):
        doc_content = f"""
This is sample text document {i}.
It contains multiple lines of text with various tokens.
The quick brown fox jumps over the lazy dog.
Special characters: !@#$%^&*()_+-=[]{{}}|;':\",./<>?
"""
        (base_path / f"document_{i}.txt").write_text(doc_content)
    
    print(f"Created {num_files} sample files in {base_path}")
    return base_path

class DataProcessor:
    """Example data processor class for demonstration."""
    def __init__(self, config: dict):
        self.config = config
    
    def run(self):
        """Run the processor with the current config."""
        print(f"Processing with config: {self.config}")

class TokenizerAnalyzer:
    """Helper class for analyzing tokenizer performance and vocabulary."""
    
    def __init__(self, tokenizer_path: str):
        self.tokenizer_path = tokenizer_path
        self.tokenizer = None
        self.vocab = None
    
    def load(self):
        """Load the tokenizer."""
        from tokenizers import Tokenizer
        self.tokenizer = Tokenizer.from_file(self.tokenizer_path)
        self.vocab = {
            idx: self.tokenizer.id_to_token(idx)
            for idx in range(self.tokenizer.get_vocab_size())
        }
    
    def analyze_vocab(self, top_n: int = 20):
        """Analyze and print vocabulary statistics."""
        if not self.tokenizer:
            self.load()
        
        vocab_size = len(self.vocab)
        special_tokens = [
            token for token in self.vocab.values() 
            if token.startswith("[") and token.endswith("]")
        ]
        
        print(f"\n=== Vocabulary Analysis ===")
        print(f"Total vocabulary size: {vocab_size}")
        print(f"Special tokens ({len(special_tokens)}): {', '.join(special_tokens[:10])}" + 
              ("..." if len(special_tokens) > 10 else ""))
        
        # Show sample of vocabulary
        print(f"\nSample vocabulary items:")
        for idx in range(min(top_n, vocab_size)):
            print(f"  {idx}: {self.vocab.get(idx, 'N/A')}")
        
        if vocab_size > top_n:
            print(f"  ... and {vocab_size - top_n} more")

def main():
    """Run the advanced example."""
    print("NexForge Tokenizer Builder - Advanced Example")
    print("=========================================\n")
    
    # 1. Setup
    output_dir = Path("advanced_output")
    output_dir.mkdir(exist_ok=True)
    
    tokenizer_path = output_dir / "advanced_tokenizer.json"
    
    # 2. Check system resources
    resources = SystemResources()
    print(f"\n=== System Resources ===")
    print(f"CPU Cores: {resources.cpu_cores}")
    print(f"Available RAM: {resources.available_ram_gb:.2f} GB")
    if resources.has_cuda:
        print(f"GPU: {resources.cuda_device} with {resources.cuda_mem_gb:.2f} GB")
    else:
        print("No CUDA GPU detected")
    
    # 3. Create sample dataset
    print("\n=== Creating Sample Dataset ===")
    dataset_path = create_large_sample_dataset(num_files=50)
    
    # 4. Custom special tokens
    special_tokens = [
        "[PAD]", "[UNK]", "[CLS]", "[SEP]", "[MASK]",
        "[PYTHON]", "[TEXT]", "[CODE]"
    ]
    
    # 5. Build the tokenizer with advanced options
    print("\n=== Building Tokenizer ===")
    print(f"Input directory: {dataset_path}")
    print(f"Output path: {tokenizer_path}")
    
    start_time = time.time()
    
    try:
        success = build_tokenizer(
            input_dir=str(dataset_path),
            output_path=str(tokenizer_path),
            vocab_size=5000,  # Larger vocabulary for better coverage
            min_frequency=2,   # Only include tokens that appear at least twice
            special_tokens=special_tokens,
            resources=resources,
            max_files=50,      # Process all files
            chunk_size=100000, # Process in 100KB chunks
            n_threads=max(1, resources.cpu_cores - 1)  # Use all but one CPU core
        )
        
        if success:
            duration = time.time() - start_time
            print(f"\nTokenizer created successfully in {duration:.2f} seconds")
            print(f"Tokenizer saved to: {tokenizer_path}")
            
            # 6. Analyze the created tokenizer
            print("\n=== Tokenizer Analysis ===")
            analyzer = TokenizerAnalyzer(str(tokenizer_path))
            analyzer.load()
            analyzer.analyze_vocab()
            
            # 7. Show example encoding/decoding
            print("\n=== Example Encoding/Decoding ===")
            sample_text = "def hello_world():\n    print('Hello, world!')  # Sample Python code"
            
            encoded = analyzer.tokenizer.encode(sample_text)
            decoded = analyzer.tokenizer.decode(encoded.ids)
            
            print(f"Original: {sample_text}")
            print(f"Encoded: {encoded.ids}")
            print(f"Tokens: {encoded.tokens}")
            print(f"Decoded: {decoded}")
            
        else:
            print("\nFailed to create tokenizer")
    
    except TokenizerError as e:
        print(f"\nError creating tokenizer: {e}")
    except Exception as e:
        print(f"\nUnexpected error: {e}")
    finally:
        # 8. Cleanup (optional)
        # import shutil
        # shutil.rmtree(dataset_path, ignore_errors=True)
        pass
    
    print("\nExample completed!")

if __name__ == "__main__":
    main()