File size: 14,588 Bytes
9390992
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22fa3c4
9390992
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
245587d
9390992
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22fa3c4
 
9390992
22fa3c4
9390992
 
 
 
 
 
 
 
 
c86f4fa
9390992
1522e0a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c86f4fa
245587d
 
1522e0a
 
c86f4fa
1522e0a
c86f4fa
 
1522e0a
c86f4fa
 
1522e0a
 
 
245587d
1522e0a
22fa3c4
245587d
1522e0a
 
 
 
 
 
 
 
 
 
 
 
9390992
 
22fa3c4
c86f4fa
187dd30
d087a59
187dd30
22fa3c4
 
d087a59
 
 
c86f4fa
 
 
 
 
 
d087a59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c86f4fa
 
d087a59
 
 
 
 
 
 
 
 
 
 
 
 
 
c86f4fa
 
 
d087a59
 
 
c86f4fa
d087a59
 
c86f4fa
d087a59
 
c86f4fa
 
d087a59
c86f4fa
 
d087a59
c86f4fa
d087a59
 
c86f4fa
 
d087a59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
187dd30
9390992
 
 
8f8cb24
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
from transformers import AutoProcessor, AutoModelForImageTextToText
from PIL import Image
import torch
import logging
from typing import Union, Tuple
from config import Config
from knowledge_base import GarbageClassificationKnowledge


class GarbageClassifier:
    def __init__(self, config: Config = None):
        self.config = config or Config()
        self.knowledge = GarbageClassificationKnowledge()
        self.processor = None
        self.model = None
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Setup logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

    def load_model(self):
        """Load the model and processor"""
        try:
            self.logger.info(f"Loading model: {self.config.MODEL_NAME}")

            # Load processor
            kwargs = {}
            if self.config.HF_TOKEN:
                kwargs["token"] = self.config.HF_TOKEN

            self.processor = AutoProcessor.from_pretrained(
                self.config.MODEL_NAME, **kwargs
            )

            # Load model
            self.model = AutoModelForImageTextToText.from_pretrained(
                self.config.MODEL_NAME,
                torch_dtype=self.config.TORCH_DTYPE,
                device_map=self.config.DEVICE_MAP,
            )

            self.logger.info("Model loaded successfully")

        except Exception as e:
            self.logger.error(f"Error loading model: {str(e)}")
            raise

    def preprocess_image(self, image: Image.Image) -> Image.Image:
        """
        Preprocess image to meet Gemma3n requirements (512x512)
        """
        # Convert to RGB if necessary
        if image.mode != "RGB":
            image = image.convert("RGB")

        # Resize to 512x512 as required by Gemma3n
        target_size = (512, 512)

        # Calculate aspect ratio preserving resize
        original_width, original_height = image.size
        aspect_ratio = original_width / original_height

        if aspect_ratio > 1:
            # Width is larger
            new_width = target_size[0]
            new_height = int(target_size[0] / aspect_ratio)
        else:
            # Height is larger or equal
            new_height = target_size[1]
            new_width = int(target_size[1] * aspect_ratio)

        # Resize image maintaining aspect ratio
        image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)

        # Create a new image with target size and paste the resized image
        processed_image = Image.new(
            "RGB", target_size, (255, 255, 255)
        )  # White background

        # Calculate position to center the image
        x_offset = (target_size[0] - new_width) // 2
        y_offset = (target_size[1] - new_height) // 2

        processed_image.paste(image, (x_offset, y_offset))

        return processed_image

    def classify_image(self, image: Union[str, Image.Image]) -> Tuple[str, str]:
        """
        Classify garbage in the image

        Args:
            image: PIL Image or path to image file

        Returns:
            Tuple of (classification_result, detailed_analysis)
        """
        if self.model is None or self.processor is None:
            raise RuntimeError("Model not loaded. Call load_model() first.")

        try:
            # Load and process image
            if isinstance(image, str):
                image = Image.open(image)
            elif not isinstance(image, Image.Image):
                raise ValueError("Image must be a PIL Image or file path")

            # Preprocess image to meet Gemma3n requirements
            processed_image = self.preprocess_image(image)

            # Prepare messages with system prompt and user query
            messages = [
                {
                    "role": "system",
                    "content": [
                        {
                            "type": "text",
                            "text": self.knowledge.get_system_prompt(),
                        }
                    ],
                },
                {
                    "role": "user",
                    "content": [
                        {"type": "image", "image": processed_image},
                        {
                            "type": "text",
                            "text": "Please classify what you see in this image. If it shows garbage/waste items, classify them according to the garbage classification standards. If it shows people, living things, or other non-waste items, classify it as 'Unable to classify' and explain why it's not garbage.",
                        },
                    ],
                },
            ]

            # Apply chat template and tokenize
            inputs = self.processor.apply_chat_template(
                messages,
                add_generation_prompt=True,
                tokenize=True,
                return_dict=True,
                return_tensors="pt",
            ).to(self.model.device, dtype=self.model.dtype)
            input_len = inputs["input_ids"].shape[-1]

            outputs = self.model.generate(
                **inputs,
                max_new_tokens=self.config.MAX_NEW_TOKENS,
                disable_compile=True,
            )
            response = self.processor.batch_decode(
                outputs[:, input_len:],
                skip_special_tokens=True,
            )[0]

            # Extract classification from response
            classification = self._extract_classification(response)

            # Extract reasoning from response
            reasoning = self._extract_reasoning(response)

            return classification, reasoning

        except Exception as e:
            self.logger.error(f"Error during classification: {str(e)}")
            import traceback

            traceback.print_exc()
            return "Error", f"Classification failed: {str(e)}"

    def _extract_classification(self, response: str) -> str:
        """Extract the main classification from the response"""
        response_lower = response.lower()
        
        # First, look for positive waste category indicators
        # Check exact category matches first
        categories = self.knowledge.get_categories()
        waste_categories = [cat for cat in categories if cat != "Unable to classify"]
        
        for category in waste_categories:
            if category.lower() in response_lower:
                # Make sure it's not in a negative context
                category_index = response_lower.find(category.lower())
                context_before = response_lower[max(0, category_index-30):category_index]
                
                # Only skip if there's a clear negation right before
                if not any(neg in context_before[-10:] for neg in ["not", "cannot", "isn't", "doesn't"]):
                    return category
        
        # Look for strong recyclable indicators
        recyclable_indicators = [
            "recyclable", "recycle", "aluminum", "plastic", "glass", "metal",
            "foil", "can", "bottle", "cardboard", "paper", "tin", "steel", "iron"
        ]
        
        if any(indicator in response_lower for indicator in recyclable_indicators):
            # Check if it's explicitly said to be recyclable
            recyclable_phrases = [
                "recyclable", "can be recycled", "made of recyclable",
                "recyclable material", "recyclable aluminum", "recyclable plastic"
            ]
            if any(phrase in response_lower for phrase in recyclable_phrases):
                return "Recyclable Waste"
            
            # Check for specific materials
            if any(material in response_lower for material in ["aluminum", "foil", "metal"]):
                return "Recyclable Waste"
            if any(material in response_lower for material in ["plastic", "bottle"]):
                return "Recyclable Waste"
            if any(material in response_lower for material in ["glass", "cardboard", "paper"]):
                return "Recyclable Waste"
        
        # Look for food waste indicators
        food_indicators = [
            "food", "fruit", "vegetable", "organic", "kitchen waste",
            "peel", "core", "scraps", "leftovers"
        ]
        if any(indicator in response_lower for indicator in food_indicators):
            return "Food/Kitchen Waste"
        
        # Look for hazardous waste indicators
        hazardous_indicators = [
            "battery", "chemical", "medicine", "paint", "toxic", "hazardous"
        ]
        if any(indicator in response_lower for indicator in hazardous_indicators):
            return "Hazardous Waste"
        
        # Look for other waste indicators
        other_waste_indicators = [
            "cigarette", "ceramic", "dust", "diaper", "tissue", "other waste"
        ]
        if any(indicator in response_lower for indicator in other_waste_indicators):
            return "Other Waste"
        
        # Only classify as "Unable to classify" if there are explicit indicators
        unable_phrases = [
            "unable to classify",
            "cannot classify",
            "cannot be classified as waste",
            "not garbage", "not waste", "not trash"
        ]
        
        if any(phrase in response_lower for phrase in unable_phrases):
            return "Unable to classify"
        
        # Check for non-garbage items (people, living things, etc.)
        non_garbage_indicators = [
            "person", "people", "human", "face", "man", "woman",
            "living", "alive", "animal", "pet",
            "portrait", "photo of a person"
        ]
        
        if any(indicator in response_lower for indicator in non_garbage_indicators):
            return "Unable to classify"
        
        # If we found waste-related content but no clear category, try to infer
        waste_related = any(word in response_lower for word in [
            "waste", "trash", "garbage", "discard", "throw", "bin"
        ])
        
        if waste_related:
            # Default to Other Waste if it's clearly waste but unclear category
            return "Other Waste"
        
        # If no clear classification found and no clear non-waste indicators, 
        # default to "Unable to classify"
        return "Unable to classify"

    def _extract_reasoning(self, response: str) -> str:
        """Extract only the reasoning content, removing all formatting markers and classification info"""
        import re
        
        # Remove all formatting markers
        cleaned_response = response.replace("**Classification**:", "")
        cleaned_response = cleaned_response.replace("**Reasoning**:", "")
        cleaned_response = re.sub(r'\*\*.*?\*\*:', '', cleaned_response)  # Remove any **text**: patterns
        cleaned_response = cleaned_response.replace("**", "")  # Remove remaining ** markers
        
        # Remove category names that might appear at the beginning
        categories = self.knowledge.get_categories()
        for category in categories:
            if cleaned_response.strip().startswith(category):
                cleaned_response = cleaned_response.replace(category, "", 1)
                break
        
        # Remove common material names that might appear at the beginning
        material_names = [
            "Glass", "Plastic", "Metal", "Paper", "Cardboard", "Aluminum", 
            "Steel", "Iron", "Tin", "Foil", "Wood", "Ceramic", "Fabric",
            "Recyclable Waste", "Food/Kitchen Waste", "Hazardous Waste", "Other Waste"
        ]
        
        # Clean the response
        cleaned_response = cleaned_response.strip()
        
        # Remove material names at the beginning
        for material in material_names:
            if cleaned_response.startswith(material):
                # Remove the material name and any following punctuation/whitespace
                cleaned_response = cleaned_response[len(material):].lstrip(" .,;:")
                break
        
        # Split into sentences and clean up
        sentences = []
        
        # Split by common sentence endings, but keep the endings
        parts = re.split(r'([.!?])\s+', cleaned_response)
        
        # Rejoin parts to maintain sentence structure
        reconstructed_parts = []
        for i in range(0, len(parts), 2):
            if i < len(parts):
                sentence = parts[i]
                if i + 1 < len(parts):
                    sentence += parts[i + 1]  # Add the punctuation back
                reconstructed_parts.append(sentence)
        
        for part in reconstructed_parts:
            part = part.strip()
            if not part:
                continue
                
            # Skip parts that are just category names or material names
            if part in categories or part.rstrip(".,;:") in material_names:
                continue
                
            # Skip parts that start with category names or material names
            is_category_line = False
            for item in categories + material_names:
                if part.startswith(item):
                    is_category_line = True
                    break
            
            if is_category_line:
                continue
                
            # Clean up the sentence
            part = re.sub(r'^[A-Za-z\s]+:', '', part).strip()  # Remove "Category:" type prefixes
            
            if part and len(part) > 3:  # Only keep meaningful content
                sentences.append(part)
        
        # Join sentences
        reasoning = ' '.join(sentences)
        
        # Final cleanup - remove any remaining standalone material words at the beginning
        reasoning_words = reasoning.split()
        if reasoning_words and reasoning_words[0] in [m.lower() for m in material_names]:
            reasoning_words = reasoning_words[1:]
            reasoning = ' '.join(reasoning_words)
        
        # Ensure proper capitalization
        if reasoning:
            reasoning = reasoning[0].upper() + reasoning[1:] if len(reasoning) > 1 else reasoning.upper()
            
            # Ensure proper punctuation
            if not reasoning.endswith(('.', '!', '?')):
                reasoning += '.'
                
        return reasoning if reasoning else "Analysis not available"

    def get_categories_info(self):
        """Get information about all categories"""
        return self.knowledge.get_category_descriptions()