Spaces:
Sleeping
Sleeping
File size: 16,040 Bytes
5a1c0ee 86f8031 5655229 86f8031 5655229 5a1c0ee 86f8031 4148517 786f81b 6d26f2a 786f81b 4148517 b20ede1 3c1dab5 86f8031 3c1dab5 5655229 b20ede1 5655229 b20ede1 5655229 3c1dab5 b20ede1 3c1dab5 b20ede1 3c1dab5 86f8031 3c1dab5 86f8031 3c1dab5 86f8031 3c1dab5 86f8031 3c1dab5 86f8031 3c1dab5 86f8031 3c1dab5 86f8031 5a1c0ee 255303a 5a1c0ee 0b2ce51 00694e7 de92129 00694e7 0b2ce51 00694e7 5a1c0ee 0b2ce51 de92129 5a1c0ee 86f8031 5a1c0ee de92129 5a1c0ee 65c8a65 86f8031 65c8a65 86f8031 5a1c0ee b20ede1 2aa7ef4 b20ede1 3c1dab5 d0aac58 3c1dab5 d0aac58 65c8a65 86f8031 5a1c0ee 86f8031 de92129 86f8031 6ccd679 86f8031 5a1c0ee 34076e0 86f8031 5a1c0ee 0b2ce51 5a1c0ee de92129 0b2ce51 86f8031 de92129 86f8031 de92129 34076e0 de92129 34076e0 86f8031 de92129 5a1c0ee b977bdd 34076e0 4020008 34076e0 4020008 34076e0 4020008 34076e0 86f8031 b977bdd d0aac58 b977bdd 86f8031 d0aac58 86f8031 b977bdd d0aac58 b977bdd d0aac58 34076e0 86f8031 4020008 b977bdd 4020008 5a1c0ee 86f8031 5a1c0ee 4020008 5a1c0ee 86f8031 65c8a65 86f8031 5a1c0ee 86f8031 65c8a65 86f8031 3d6c93e 86f8031 3d6c93e 86f8031 d0aac58 3d6c93e d0aac58 3d6c93e 86f8031 d0aac58 86f8031 65c8a65 86f8031 3d6c93e eb844af 3d6c93e eb844af 3d6c93e eb844af 3d6c93e eb844af 3d6c93e eb844af 3d6c93e 536675c eb844af 86f8031 3c1dab5 b977bdd 86f8031 d0aac58 6d26f2a 163e549 6b98718 163e549 7baae84 163e549 b977bdd 163e549 b977bdd 65c8a65 86f8031 69be800 86f8031 7baae84 d0aac58 86f8031 69be800 86f8031 65c8a65 e47833a b977bdd 5130e7d e47833a 3c1dab5 5130e7d 65c8a65 d0aac58 b20ede1 3c1dab5 d0aac58 b283197 eb844af b283197 65c8a65 d0aac58 86f8031 5e8a670 65c8a65 86f8031 65c8a65 86f8031 5a1c0ee 86f8031 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 |
import gradio as gr
import numpy as np
import cv2
from PIL import Image
import requests
from datetime import datetime
import io
import os
from urllib.parse import urljoin
from bs4 import BeautifulSoup
# Default parameters
low_int = 10
high_int = 255
edge_thresh = 100
accum_thresh = 20
center_tol = 60
morph_dia = 2
min_rad = 40
def fetch_sdo_images(max_images, size="1024by960", tool="ccor1"):
"""Fetch SDO images from NOAA URL directory."""
try:
# Construct the directory URL
base_url = f"https://services.swpc.noaa.gov/images/animations/{tool}/"
# Fetch the directory listing
response = requests.get(base_url, timeout=10)
if response.status_code != 200:
return None, f"Failed to access directory {base_url}: Status {response.status_code}", 0
# Parse HTML with BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
links = soup.find_all('a')
# Extract image filenames matching the pattern YYYYMMDD_HHMM_{tool}_{size}.jpg
image_files = []
for link in links:
href = link.get('href')
if href and href.endswith(f"_{tool}_{size}.jpg"):
# Check if the filename starts with a valid timestamp (YYYYMMDD_HHMM)
if len(href) >= 12 and href[:8].isdigit() and href[9:13].isdigit():
image_files.append(href)
# Sort images by timestamp (most recent first)
image_files = sorted(image_files, key=lambda x: x[:13], reverse=True)
total_images = len(image_files)
if not image_files:
return None, f"No images found with size={size}, tool={tool}.", total_images
# Fetch up to max_images
frames = []
for img_file in image_files[:max_images]:
url = urljoin(base_url, img_file)
try:
img_response = requests.get(url, timeout=5)
if img_response.status_code == 200:
img = Image.open(io.BytesIO(img_response.content)).convert('L') # Convert to grayscale
frames.append(np.array(img))
else:
print(f"Failed to fetch {url}: Status {img_response.status_code}")
except Exception as e:
print(f"Error fetching {url}: {str(e)}")
if not frames:
return None, f"No valid images fetched.", total_images
# Reverse frames to display from oldest to newest
frames = frames[::-1]
return frames, f"Fetched {len(frames)} images. Total available: {total_images}.", total_images
except Exception as e:
return None, f"Error fetching images: {str(e)}", 0
def extract_frames(gif_path):
"""Extract frames from a GIF and return as a list of numpy arrays."""
try:
img = Image.open(gif_path)
frames = []
while True:
frame = img.convert('L') # Convert to grayscale
frames.append(np.array(frame))
try:
img.seek(img.tell() + 1)
except EOFError:
break
return frames, None
except Exception as e:
return None, f"Error loading GIF: {str(e)}"
def preprocess_frame(frame, lower_bound, upper_bound, morph_iterations):
"""Preprocess a frame: isolate mid-to-light pixels and enhance circular patterns."""
blurred = cv2.GaussianBlur(frame, (9, 9), 0)
mask = cv2.inRange(blurred, lower_bound, upper_bound)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
enhanced = cv2.dilate(mask, kernel, iterations=morph_iterations)
return enhanced
def detect_circles(frame_diff, image_center, center_tolerance, param1, param2, min_radius=20, max_radius=200):
"""Detect circles in a frame difference image, centered at the Sun."""
circles = cv2.HoughCircles(
frame_diff,
cv2.HOUGH_GRADIENT,
dp=1.5,
minDist=100,
param1=param1,
param2=param2,
minRadius=min_radius,
maxRadius=max_radius
)
if circles is not None:
circles = np.round(circles[0, :]).astype("int")
filtered_circles = []
for (x, y, r) in circles:
if (abs(x - image_center[0]) < center_tolerance and
abs(y - image_center[1]) < center_tolerance):
filtered_circles.append((x, y, r))
return filtered_circles if filtered_circles else None
return None
def create_gif(frames, output_path, duration=0.5, scale_to_512=False):
"""Create a GIF from a list of frames, optionally scaling to 512x512."""
pil_frames = [Image.fromarray(frame) for frame in frames]
if scale_to_512:
pil_frames = [frame.resize((512, 512), Image.Resampling.LANCZOS) for frame in pil_frames]
pil_frames[0].save(
output_path,
save_all=True,
append_images=pil_frames[1:],
duration=int(duration * 1000), # Duration in milliseconds
loop=0
)
return output_path
def handle_fetch(max_images, size, tool):
"""Fetch SDO images and return frames for preview and state."""
frames, message, total_images = fetch_sdo_images(max_images, size, tool)
if frames is None:
return message, [], frames, total_images
preview_frames = [Image.fromarray(frame) for frame in frames]
return message, preview_frames, frames, total_images
def analyze_images(frames, lower_bound, upper_bound, param1, param2, center_tolerance, morph_iterations, min_rad, display_mode, scale_to_512):
"""Analyze frames for concentric circles, highlighting growing series."""
try:
if not frames or len(frames) < 2:
return "At least 2 frames are required for analysis.", [], None
# Determine image center
height, width = frames[0].shape
image_center = (width // 2, height // 2)
min_radius = int(min_rad)
max_radius = min(height, width) // 2
# Process frames and detect circles
all_circle_data = []
for i in range(len(frames) - 1):
frame1 = preprocess_frame(frames[i], lower_bound, upper_bound, morph_iterations)
frame2 = preprocess_frame(frames[i + 1], lower_bound, upper_bound, morph_iterations)
frame_diff = cv2.absdiff(frame2, frame1)
frame_diff = cv2.convertScaleAbs(frame_diff, alpha=3.0, beta=0)
circles = detect_circles(frame_diff, image_center, center_tolerance, param1, param2, min_radius, max_radius)
if circles:
largest_circle = max(circles, key=lambda c: c[2])
x, y, r = largest_circle
all_circle_data.append({
"frame": i + 1,
"center": (x, y),
"radius": r,
"output_frame": frames[i + 1]
})
# Find growing series (indicative of CMEs)
growing_circle_data = []
current_series = []
if all_circle_data:
current_series.append(all_circle_data[0])
for i in range(1, len(all_circle_data)):
if all_circle_data[i]["radius"] > current_series[-1]["radius"]:
current_series.append(all_circle_data[i])
else:
if len(current_series) > len(growing_circle_data):
growing_circle_data = current_series.copy()
current_series = [all_circle_data[i]]
if len(current_series) > len(growing_circle_data):
growing_circle_data = current_series.copy()
growing_frames = set(c["frame"] for c in growing_circle_data)
results = []
report = f"Analysis Report (as of {datetime.now().strftime('%I:%M %p PDT, %B %d, %Y')}):\n"
# Prepare output based on display mode
if display_mode == "Raw Frames":
results = [Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)) for frame in frames]
elif display_mode == "CME Detection":
for i, frame in enumerate(frames):
if i + 1 in growing_frames:
for c in growing_circle_data:
if c["frame"] == i + 1:
output_frame = cv2.cvtColor(c["output_frame"], cv2.COLOR_GRAY2RGB)
cv2.circle(output_frame, c["center"], c["radius"], (255, 255, 0), 2) # Yellow for CME
results.append(Image.fromarray(output_frame))
break
else:
results.append(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)))
elif display_mode == "All Detection":
for i, frame in enumerate(frames):
if i + 1 in [c["frame"] for c in all_circle_data]:
for c in all_circle_data:
if c["frame"] == i + 1:
output_frame = cv2.cvtColor(c["output_frame"], cv2.COLOR_GRAY2RGB)
cv2.circle(output_frame, c["center"], c["radius"], (0, 255, 0), 2) # Green for detected
if c["frame"] in growing_frames:
cv2.circle(output_frame, c["center"], c["radius"] + 2, (255, 255, 0), 2) # Yellow for CME
results.append(Image.fromarray(output_frame))
break
else:
results.append(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)))
# Generate report
if all_circle_data:
report += f"\nAll Frames with Detected Circles ({len(all_circle_data)} frames):\n"
for c in all_circle_data:
report += f"Frame {c['frame']}: Center at {c['center']}, Radius {c['radius']} pixels\n"
else:
report += "No circles detected.\n"
if growing_circle_data:
report += f"\nSeries of Frames with Growing Circles (CMEs) ({len(growing_circle_data)} frames):\n"
for c in growing_circle_data:
report += f"Frame {c['frame']}: Center at {c['center']}, Radius {c['radius']} pixels\n"
report += "\nConclusion: Growing concentric circles detected, indicative of a potential Earth-directed CME."
else:
report += "\nNo growing concentric circles detected. CME may not be Earth-directed."
# Create GIF if results exist
gif_path = None
if results:
gif_frames = [np.array(img) for img in results]
gif_path = "output.gif"
create_gif(gif_frames, gif_path, scale_to_512=scale_to_512)
return report, results, gif_path
except Exception as e:
return f"Error during analysis: {str(e)}", [], None
def process_input(gif_file, max_images, size, tool, lower_bound, upper_bound, param1, param2, center_tolerance, morph_iterations, min_rad, display_mode, fetched_frames_state, scale_to_512):
"""Process either uploaded GIF or fetched SDO images."""
if gif_file:
frames, error = extract_frames(gif_file.name)
total_images = 0 # No total images when using uploaded GIF
if error:
return error, [], None, [], total_images, None
else:
frames = fetched_frames_state
_, _, total_images = fetch_sdo_images(max_images, size, tool) # Get total_images
if not frames:
return "No fetched frames available. Please fetch images first.", [], None, [], total_images, None
# Preview all frames
preview = [Image.fromarray(frame) for frame in frames] if frames else []
# Analyze frames
report, results, gif_path = analyze_images(
frames, lower_bound, upper_bound, param1, param2, center_tolerance, morph_iterations, min_rad, display_mode, scale_to_512
)
return report, results, gif_path, preview, total_images, gif_path
def load_demo():
"""Load demo GIF for analysis."""
gif_file = "./demo_gif.gif"
if os.path.exists(gif_file):
frames, error = extract_frames(gif_file)
total_images = 0 # Demo GIF, so no directory total
if error:
return error, [], None, total_images
else:
return "Demo GIF not found.", [], None, 0
preview = [Image.fromarray(frame) for frame in frames] if frames else []
message = "Demo Loaded"
return message, preview, frames, total_images
# Gradio Blocks interface
with gr.Blocks(title="Solar CME Detection") as demo:
gr.Markdown("""
# Solar CME Detection
Upload a GIF or fetch SDO images to detect concentric circles indicative of coronal mass ejections (CMEs).
Yellow circles mark growing series (potential CMEs); green circles mark other detected features.
""")
# State to store fetched frames
fetched_frames_state = gr.State(value=[])
with gr.Sidebar(open=False):
gr.Markdown("### Analysis Parameters")
size = gr.Textbox(label="Image Size", value="1024by960")
tool = gr.Textbox(label="Instrument", value="ccor1")
lower_bound = gr.Slider(minimum=0, maximum=255, value=low_int, step=1, label="Lower Intensity Bound (0-255)")
upper_bound = gr.Slider(minimum=0, maximum=255, value=high_int, step=1, label="Upper Intensity Bound (0-255)")
param1 = gr.Slider(minimum=10, maximum=200, value=edge_thresh, step=1, label="Hough Param1 (Edge Threshold)")
param2 = gr.Slider(minimum=1, maximum=50, value=accum_thresh, step=1, label="Hough Param2 (Accumulator Threshold)")
center_tolerance = gr.Slider(minimum=10, maximum=100, value=center_tol, step=1, label="Center Tolerance (Pixels)")
morph_iterations = gr.Slider(minimum=1, maximum=5, value=morph_dia, step=1, label="Morphological Dilation Iterations")
min_rad = gr.Slider(minimum=1, maximum=100, value=min_rad, step=1, label="Minimum Circle Radius")
display_mode = gr.Dropdown(
choices=["Raw Frames", "CME Detection", "All Detection"],
value="All Detection",
label="Display Mode"
)
scale_to_512 = gr.Checkbox(label="Scale Output GIF to 512x512", value=False)
with gr.Row():
with gr.Column():
gr.Markdown("### Input Options")
demo_btn = gr.Button("Load Demo")
gif_input = gr.File(label="Upload Solar GIF (optional)", file_types=[".gif"])
max_images = gr.Slider(minimum=1, maximum=300, value=10, step=1, label="Max Images to Fetch")
fetch_button = gr.Button("Fetch Images from URL")
analyze_button = gr.Button("Analyze")
with gr.Column():
gr.Markdown("### Outputs")
report = gr.Textbox(label="Analysis Report", lines=10)
gif_viewer = gr.Image(label="Output GIF Preview", type="filepath")
with gr.Row():
with gr.Column():
gif_output = gr.File(label="Download Resulting GIF")
gallery = gr.Gallery(label="Frames with Detected Circles (Green: Detected, Yellow: CME)")
with gr.Column():
total_images = gr.Textbox(label="Total Images Available in Directory", value="0")
preview = gr.Gallery(label="Input Preview (All Frames)")
# Fetch button action
fetch_button.click(
fn=handle_fetch,
inputs=[max_images, size, tool],
outputs=[report, preview, fetched_frames_state, total_images]
)
demo_btn.click(
fn=load_demo,
inputs=[],
outputs=[report, preview, fetched_frames_state, total_images]
)
# Analyze button action
analyze_button.click(
fn=process_input,
inputs=[
gif_input, max_images, size, tool,
lower_bound, upper_bound, param1, param2, center_tolerance, morph_iterations, min_rad, display_mode, fetched_frames_state, scale_to_512
],
outputs=[report, gallery, gif_output, preview, total_images, gif_viewer]
)
if __name__ == "__main__":
demo.launch() |