rahul7star's picture
Update app.py
bf79bba verified
import os
os.environ["HF_HOME"] = "/tmp/hf_cache"
os.makedirs("/tmp/hf_cache", exist_ok=True)
from fastapi import FastAPI, Query
from huggingface_hub import list_repo_files, hf_hub_download, upload_file
import io
import requests
from fastapi import BackgroundTasks
from fastapi import FastAPI, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
import os
import os
import zipfile
import tempfile # βœ… Add this!
app = FastAPI()
# CORS setup to allow requests from your frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Replace "*" with your frontend domain in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def health_check():
return {"status": "βœ… FastAPI running on Hugging Face Spaces!"}
REPO_ID = "rahul7star/ohamlab"
FOLDER = "demo"
BASE_URL = f"https://huggingface.co/{REPO_ID}/resolve/main/"
#show all images in a DIR at UI FE
@app.get("/images")
def list_images():
try:
all_files = list_repo_files(REPO_ID)
folder_prefix = FOLDER.rstrip("/") + "/"
files_in_folder = [
f for f in all_files
if f.startswith(folder_prefix)
and "/" not in f[len(folder_prefix):] # no subfolder files
and f.lower().endswith((".png", ".jpg", ".jpeg", ".webp"))
]
urls = [BASE_URL + f for f in files_in_folder]
return {"images": urls}
except Exception as e:
return {"error": str(e)}
from datetime import datetime
import tempfile
import uuid
# upload zip from UI
@app.post("/upload-zip")
async def upload_zip(file: UploadFile = File(...)):
if not file.filename.endswith(".zip"):
return {"error": "Please upload a .zip file"}
# Save the ZIP to /tmp
temp_zip_path = f"/tmp/{file.filename}"
with open(temp_zip_path, "wb") as f:
f.write(await file.read())
# Create a unique subfolder name inside 'demo/'
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
unique_id = uuid.uuid4().hex[:6]
folder_name = f"upload_{timestamp}_{unique_id}"
hf_folder_prefix = f"demo/{folder_name}"
try:
with tempfile.TemporaryDirectory() as extract_dir:
# Extract zip
with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
uploaded_files = []
# Upload all extracted files
for root_dir, _, files in os.walk(extract_dir):
for name in files:
file_path = os.path.join(root_dir, name)
relative_path = os.path.relpath(file_path, extract_dir)
repo_path = f"{hf_folder_prefix}/{relative_path}".replace("\\", "/")
upload_file(
path_or_fileobj=file_path,
path_in_repo=repo_path,
repo_id="rahul7star/ohamlab",
repo_type="model",
commit_message=f"Upload {relative_path} to {folder_name}",
token=True,
)
uploaded_files.append(repo_path)
return {
"message": f"βœ… Uploaded {len(uploaded_files)} files",
"folder": folder_name,
"files": uploaded_files,
}
except Exception as e:
return {"error": f"❌ Failed to process zip: {str(e)}"}
# upload a single file from UI
from typing import List
from fastapi import UploadFile, File, APIRouter
import os
from fastapi import UploadFile, File, APIRouter
from typing import List
from datetime import datetime
import uuid, os
@app.post("/upload")
async def upload_images(
background_tasks: BackgroundTasks,
files: List[UploadFile] = File(...)
):
# Step 1: Generate dynamic folder name
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
unique_id = uuid.uuid4().hex[:6]
folder_name = f"upload_{timestamp}_{unique_id}"
hf_folder_prefix = f"demo/{folder_name}"
responses = []
# Step 2: Save and upload each image
for file in files:
filename = file.filename
contents = await file.read()
temp_path = f"/tmp/{filename}"
with open(temp_path, "wb") as f:
f.write(contents)
try:
upload_file(
path_or_fileobj=temp_path,
path_in_repo=f"{hf_folder_prefix}/{filename}",
repo_id=T_REPO_ID,
repo_type="model",
commit_message=f"Upload {filename} to {hf_folder_prefix}",
token=True,
)
responses.append({
"filename": filename,
"status": "βœ… uploaded",
"path": f"{hf_folder_prefix}/{filename}"
})
except Exception as e:
responses.append({
"filename": filename,
"status": f"❌ failed: {str(e)}"
})
os.remove(temp_path)
# Step 3: Add filter job to background
def run_filter():
try:
result = filter_and_rename_images(folder=hf_folder_prefix)
print(f"🧼 Filter result: {result}")
except Exception as e:
print(f"❌ Filter failed: {str(e)}")
background_tasks.add_task(run_filter)
return {
"message": f"{len(files)} file(s) uploaded",
"upload_folder": hf_folder_prefix,
"results": responses,
"note": "Filtering started in background"
}
#Tranining Data set start fitering data for traninig
T_REPO_ID = "rahul7star/ohamlab"
DESCRIPTION_TEXT = (
"Ra3hul is wearing a black jacket over a striped white t-shirt with blue jeans. "
"He is standing near a lake with his arms spread wide open, with mountains and cloudy skies in the background."
)
def is_image_file(filename: str) -> bool:
return filename.lower().endswith((".png", ".jpg", ".jpeg", ".webp"))
@app.post("/filter-images")
def filter_and_rename_images(folder: str = Query("demo", description="Folder path in repo to scan")):
try:
all_files = list_repo_files(T_REPO_ID)
folder_prefix = folder.rstrip("/") + "/"
filter_folder = f"filter-{folder.rstrip('/')}"
filter_prefix = filter_folder + "/"
# Filter images only directly in the folder (no subfolders)
image_files = [
f for f in all_files
if f.startswith(folder_prefix)
and "/" not in f[len(folder_prefix):] # no deeper path
and is_image_file(f)
]
if not image_files:
return {"error": f"No images found in folder '{folder}'"}
uploaded_files = []
for idx, orig_path in enumerate(image_files, start=1):
# Download image content bytes (uses local cache)
local_path = hf_hub_download(repo_id=T_REPO_ID, filename=orig_path)
with open(local_path, "rb") as f:
file_bytes = f.read()
# Rename images as image1.jpeg, image2.jpeg, ...
new_image_name = f"image{idx}.jpeg"
# Upload renamed image from memory
upload_file(
path_or_fileobj=io.BytesIO(file_bytes),
path_in_repo=filter_prefix + new_image_name,
repo_id=T_REPO_ID,
repo_type="model",
commit_message=f"Upload renamed image {new_image_name} to {filter_folder}",
token=True,
)
uploaded_files.append(filter_prefix + new_image_name)
# Create and upload text file for each image
txt_filename = f"image{idx}.txt"
upload_file(
path_or_fileobj=io.BytesIO(DESCRIPTION_TEXT.encode("utf-8")),
path_in_repo=filter_prefix + txt_filename,
repo_id=T_REPO_ID,
repo_type="model",
commit_message=f"Upload text file {txt_filename} to {filter_folder}",
token=True,
)
uploaded_files.append(filter_prefix + txt_filename)
return {
"message": f"Processed and uploaded {len(image_files)} images and text files.",
"files": uploaded_files,
}
except Exception as e:
return {"error": str(e)}
# Test call another space and send the payload
@app.post("/webhook-trigger")
def call_other_space():
try:
payload = {"input": "Start training from external trigger"}
res = requests.post(
"https://rahul7star-ohamlab-ai-toolkit.hf.space/trigger",
json=payload,
timeout=30,
)
# βœ… check if response has content and is JSON
try:
data = res.json()
except ValueError:
return {
"error": f"Invalid JSON response. Status: {res.status_code}",
"text": res.text
}
return data
except Exception as e:
return {"error": str(e)}