""" This module provides real-time data integration and processing capabilities. """ import asyncio import aiohttp import json from typing import Dict, Any, List, Optional from datetime import datetime class RealTimeDataIntegrator: def __init__(self): self.data_sources = {} self.active_streams = {} self.data_buffer = {} self.session = None async def initialize(self): """ Initialize the data integrator with a new aiohttp session. """ if not self.session: self.session = aiohttp.ClientSession() async def add_data_source(self, source_id: str, config: Dict[str, Any]) -> bool: """ Add a new data source for real-time integration. Args: source_id: Unique identifier for the data source config: Configuration for the data source Returns: Success status """ try: self.data_sources[source_id] = { "config": config, "status": "configured", "last_update": None, "error_count": 0 } return True except Exception as e: print(f"Error adding data source: {e}") return False async def start_stream(self, source_id: str) -> bool: """ Start streaming data from a specific source. Args: source_id: ID of the data source to stream from Returns: Success status """ if source_id not in self.data_sources: return False try: # Initialize buffer for this stream self.data_buffer[source_id] = [] # Start async streaming task self.active_streams[source_id] = asyncio.create_task( self._stream_data(source_id) ) return True except Exception as e: print(f"Error starting stream: {e}") return False async def stop_stream(self, source_id: str) -> bool: """ Stop streaming from a specific source. Args: source_id: ID of the data source to stop Returns: Success status """ if source_id in self.active_streams: try: self.active_streams[source_id].cancel() del self.active_streams[source_id] return True except Exception as e: print(f"Error stopping stream: {e}") return False async def get_latest_data(self, source_id: str) -> Optional[Dict[str, Any]]: """ Get the latest data from a specific source. Args: source_id: ID of the data source to query Returns: Latest data point if available """ if source_id in self.data_buffer and self.data_buffer[source_id]: return self.data_buffer[source_id][-1] return None async def _stream_data(self, source_id: str): """ Internal method to handle continuous data streaming. """ config = self.data_sources[source_id]["config"] url = config.get("url") interval = config.get("interval", 1.0) # Default to 1 second while True: try: if not self.session: await self.initialize() async with self.session.get(url) as response: if response.status == 200: data = await response.json() self._process_data(source_id, data) else: self.data_sources[source_id]["error_count"] += 1 except Exception as e: print(f"Stream error for {source_id}: {e}") self.data_sources[source_id]["error_count"] += 1 await asyncio.sleep(interval) def _process_data(self, source_id: str, data: Dict[str, Any]): """ Process incoming data from a stream. """ # Add timestamp processed_data = { "timestamp": datetime.now().isoformat(), "data": data } # Update buffer self.data_buffer[source_id].append(processed_data) # Limit buffer size max_buffer = self.data_sources[source_id]["config"].get("buffer_size", 1000) if len(self.data_buffer[source_id]) > max_buffer: self.data_buffer[source_id] = self.data_buffer[source_id][-max_buffer:] # Update source status self.data_sources[source_id]["last_update"] = processed_data["timestamp"] self.data_sources[source_id]["status"] = "active" def get_source_status(self, source_id: str) -> Dict[str, Any]: """ Get status information for a data source. Args: source_id: ID of the data source to query Returns: Status information for the source """ if source_id not in self.data_sources: return {"status": "not_found"} source = self.data_sources[source_id] return { "status": source["status"], "last_update": source["last_update"], "error_count": source["error_count"], "buffer_size": len(self.data_buffer.get(source_id, [])) } async def clear_data(self, source_id: str) -> bool: """ Clear stored data for a specific source. Args: source_id: ID of the data source to clear Returns: Success status """ if source_id in self.data_buffer: self.data_buffer[source_id] = [] return True return False async def shutdown(self): """ Cleanup and shutdown all data streams. """ # Cancel all active streams for source_id in list(self.active_streams.keys()): await self.stop_stream(source_id) # Close aiohttp session if self.session: await self.session.close() self.session = None # Clear buffers self.data_buffer = {}