File size: 4,042 Bytes
a3ce164
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from typing import Dict, List, Any
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import os

class EndpointHandler():
    def __init__(self, path=""):
        # Look for checkpoint-100 folder
        checkpoint_path = None
        
        if not path or path == "/repository":
            base_path = "."
        else:
            base_path = path
            
        # Check different possible locations
        possible_paths = [
            os.path.join(base_path, "checkpoint-100"),
            os.path.join(".", "checkpoint-100"),
            os.path.join("/repository", "checkpoint-100"),
            "checkpoint-100"
        ]
        
        for check_path in possible_paths:
            if os.path.exists(check_path) and os.path.isdir(check_path):
                # Verify it contains model files
                files = os.listdir(check_path)
                if any(f in files for f in ['config.json', 'pytorch_model.bin', 'model.safetensors']):
                    checkpoint_path = check_path
                    break
        
        if checkpoint_path is None:
            print(f"Available files in base path: {os.listdir(base_path) if os.path.exists(base_path) else 'Path does not exist'}")
            raise ValueError("Could not find checkpoint-100 folder with model files")
        
        print(f"Loading model from: {checkpoint_path}")
        print(f"Files in checkpoint: {os.listdir(checkpoint_path)}")
        
        # Load model and tokenizer from checkpoint-100
        self.tokenizer = AutoTokenizer.from_pretrained(checkpoint_path, trust_remote_code=True)
        self.model = AutoModelForCausalLM.from_pretrained(
            checkpoint_path,
            device_map="auto",
            torch_dtype=torch.bfloat16,
            trust_remote_code=True,
        )
        
        # Set pad token if not exists
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

    def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        data args:
            inputs (:str): a string to be generated from
            parameters (:dict): generation parameters
        Return:
            A :obj:`list` | `dict`: will be serialized and returned
        """
        # Get the input text
        inputs = data.pop("inputs", data)
        parameters = data.pop("parameters", {})
        
        # Handle string input directly
        if isinstance(inputs, str):
            input_text = inputs
        else:
            input_text = str(inputs)
        
        # Set default parameters
        max_new_tokens = parameters.get("max_new_tokens", 1000)
        temperature = parameters.get("temperature", 0.1)
        do_sample = parameters.get("do_sample", True)
        top_p = parameters.get("top_p", 0.9)
        return_full_text = parameters.get("return_full_text", False)
        
        # Tokenize the input
        input_ids = self.tokenizer(
            input_text, 
            return_tensors="pt", 
            padding=True, 
            truncation=True,
            max_length=2048
        ).to(self.model.device)
        
        # Generate text
        with torch.no_grad():
            generated_ids = self.model.generate(
                **input_ids,
                max_new_tokens=max_new_tokens,
                temperature=temperature,
                do_sample=do_sample,
                top_p=top_p,
                pad_token_id=self.tokenizer.pad_token_id,
                eos_token_id=self.tokenizer.eos_token_id,
            )
        
        # Decode the generated text
        if return_full_text:
            generated_text = self.tokenizer.decode(generated_ids[0], skip_special_tokens=True)
        else:
            # Only return the newly generated part
            new_tokens = generated_ids[0][input_ids["input_ids"].shape[1]:]
            generated_text = self.tokenizer.decode(new_tokens, skip_special_tokens=True)
            
        return [{"generated_text": generated_text}]