import os os.environ["HF_HOME"] = "/tmp/hf_cache" os.makedirs("/tmp/hf_cache", exist_ok=True) from fastapi import FastAPI, Query from huggingface_hub import list_repo_files, hf_hub_download, upload_file import io import requests from fastapi import BackgroundTasks from fastapi import FastAPI, UploadFile, File from fastapi.middleware.cors import CORSMiddleware import os import os import zipfile import tempfile # ✅ Add this! app = FastAPI() # CORS setup to allow requests from your frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], # Replace "*" with your frontend domain in production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") def health_check(): return {"status": "✅ FastAPI running on Hugging Face Spaces!"} REPO_ID = "rahul7star/ohamlab" FOLDER = "demo" BASE_URL = f"https://huggingface.co/{REPO_ID}/resolve/main/" #show all images in a DIR at UI FE @app.get("/images") def list_images(): try: all_files = list_repo_files(REPO_ID) folder_prefix = FOLDER.rstrip("/") + "/" files_in_folder = [ f for f in all_files if f.startswith(folder_prefix) and "/" not in f[len(folder_prefix):] # no subfolder files and f.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) ] urls = [BASE_URL + f for f in files_in_folder] return {"images": urls} except Exception as e: return {"error": str(e)} from datetime import datetime import tempfile import uuid # upload zip from UI @app.post("/upload-zip") async def upload_zip(file: UploadFile = File(...)): if not file.filename.endswith(".zip"): return {"error": "Please upload a .zip file"} # Save the ZIP to /tmp temp_zip_path = f"/tmp/{file.filename}" with open(temp_zip_path, "wb") as f: f.write(await file.read()) # Create a unique subfolder name inside 'demo/' timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") unique_id = uuid.uuid4().hex[:6] folder_name = f"upload_{timestamp}_{unique_id}" hf_folder_prefix = f"demo/{folder_name}" try: with tempfile.TemporaryDirectory() as extract_dir: # Extract zip with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref: zip_ref.extractall(extract_dir) uploaded_files = [] # Upload all extracted files for root_dir, _, files in os.walk(extract_dir): for name in files: file_path = os.path.join(root_dir, name) relative_path = os.path.relpath(file_path, extract_dir) repo_path = f"{hf_folder_prefix}/{relative_path}".replace("\\", "/") upload_file( path_or_fileobj=file_path, path_in_repo=repo_path, repo_id="rahul7star/ohamlab", repo_type="model", commit_message=f"Upload {relative_path} to {folder_name}", token=True, ) uploaded_files.append(repo_path) return { "message": f"✅ Uploaded {len(uploaded_files)} files", "folder": folder_name, "files": uploaded_files, } except Exception as e: return {"error": f"❌ Failed to process zip: {str(e)}"} # upload a single file from UI from typing import List from fastapi import UploadFile, File, APIRouter import os from fastapi import UploadFile, File, APIRouter from typing import List from datetime import datetime import uuid, os @app.post("/upload") async def upload_images( background_tasks: BackgroundTasks, files: List[UploadFile] = File(...) ): # Step 1: Generate dynamic folder name timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") unique_id = uuid.uuid4().hex[:6] folder_name = f"upload_{timestamp}_{unique_id}" hf_folder_prefix = f"demo/{folder_name}" responses = [] # Step 2: Save and upload each image for file in files: filename = file.filename contents = await file.read() temp_path = f"/tmp/{filename}" with open(temp_path, "wb") as f: f.write(contents) try: upload_file( path_or_fileobj=temp_path, path_in_repo=f"{hf_folder_prefix}/{filename}", repo_id=T_REPO_ID, repo_type="model", commit_message=f"Upload {filename} to {hf_folder_prefix}", token=True, ) responses.append({ "filename": filename, "status": "✅ uploaded", "path": f"{hf_folder_prefix}/{filename}" }) except Exception as e: responses.append({ "filename": filename, "status": f"❌ failed: {str(e)}" }) os.remove(temp_path) # Step 3: Add filter job to background def run_filter(): try: result = filter_and_rename_images(folder=hf_folder_prefix) print(f"🧼 Filter result: {result}") except Exception as e: print(f"❌ Filter failed: {str(e)}") background_tasks.add_task(run_filter) return { "message": f"{len(files)} file(s) uploaded", "upload_folder": hf_folder_prefix, "results": responses, "note": "Filtering started in background" } #Tranining Data set start fitering data for traninig T_REPO_ID = "rahul7star/ohamlab" DESCRIPTION_TEXT = ( "Ra3hul is wearing a black jacket over a striped white t-shirt with blue jeans. " "He is standing near a lake with his arms spread wide open, with mountains and cloudy skies in the background." ) def is_image_file(filename: str) -> bool: return filename.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) @app.post("/filter-images") def filter_and_rename_images(folder: str = Query("demo", description="Folder path in repo to scan")): try: all_files = list_repo_files(T_REPO_ID) folder_prefix = folder.rstrip("/") + "/" filter_folder = f"filter-{folder.rstrip('/')}" filter_prefix = filter_folder + "/" # Filter images only directly in the folder (no subfolders) image_files = [ f for f in all_files if f.startswith(folder_prefix) and "/" not in f[len(folder_prefix):] # no deeper path and is_image_file(f) ] if not image_files: return {"error": f"No images found in folder '{folder}'"} uploaded_files = [] for idx, orig_path in enumerate(image_files, start=1): # Download image content bytes (uses local cache) local_path = hf_hub_download(repo_id=T_REPO_ID, filename=orig_path) with open(local_path, "rb") as f: file_bytes = f.read() # Rename images as image1.jpeg, image2.jpeg, ... new_image_name = f"image{idx}.jpeg" # Upload renamed image from memory upload_file( path_or_fileobj=io.BytesIO(file_bytes), path_in_repo=filter_prefix + new_image_name, repo_id=T_REPO_ID, repo_type="model", commit_message=f"Upload renamed image {new_image_name} to {filter_folder}", token=True, ) uploaded_files.append(filter_prefix + new_image_name) # Create and upload text file for each image txt_filename = f"image{idx}.txt" upload_file( path_or_fileobj=io.BytesIO(DESCRIPTION_TEXT.encode("utf-8")), path_in_repo=filter_prefix + txt_filename, repo_id=T_REPO_ID, repo_type="model", commit_message=f"Upload text file {txt_filename} to {filter_folder}", token=True, ) uploaded_files.append(filter_prefix + txt_filename) return { "message": f"Processed and uploaded {len(image_files)} images and text files.", "files": uploaded_files, } except Exception as e: return {"error": str(e)} # Test call another space and send the payload @app.post("/webhook-trigger") def call_other_space(): try: payload = {"input": "Start training from external trigger"} res = requests.post( "https://rahul7star-ohamlab-ai-toolkit.hf.space/trigger", json=payload, timeout=30, ) # ✅ check if response has content and is JSON try: data = res.json() except ValueError: return { "error": f"Invalid JSON response. Status: {res.status_code}", "text": res.text } return data except Exception as e: return {"error": str(e)}