import os os.environ["HF_HOME"] = "/tmp/hf_cache" import sys sys.path.append(os.path.dirname(os.path.abspath(__file__))) os.makedirs("/tmp/hf_cache", exist_ok=True) from huggingface_hub import whoami os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" from fastapi import HTTPException import spaces from fastapi import FastAPI, Query from huggingface_hub import list_repo_files, hf_hub_download, upload_file import io import requests import logging from fastapi import BackgroundTasks from fastapi import FastAPI, UploadFile, File from fastapi.middleware.cors import CORSMiddleware from pathlib import Path from pathlib import Path from flux_train_ui import start_training import uuid import shutil import json import os import os import os import zipfile import tempfile # ✅ Add this! import yaml sys.path.insert(0, os.getcwd()) import gradio as gr from PIL import Image import torch import uuid import os import shutil import json import yaml from slugify import slugify from collections import deque log_buffer = deque(maxlen=1000) class InMemoryLogHandler(logging.Handler): def emit(self, record): msg = self.format(record) log_buffer.append(msg) # Setup logging to buffer handler = InMemoryLogHandler() handler.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")) logging.getLogger().addHandler(handler) sys.path.insert(0, "ai-toolkit") from toolkit.job import get_job from dataclasses import dataclass, field from typing import Dict import uuid from datetime import datetime @dataclass class Job: job_id: str folder_path: str status: str = "pending" created_at: str = field(default_factory=lambda: datetime.utcnow().isoformat()) # Global job queue (in-memory) JOB_QUEUE: Dict[str, Job] = {} app = FastAPI() # CORS setup to allow requests from your frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], # Replace "*" with your frontend domain in production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") def health_check(): return {"status": "✅ FastAPI running on Hugging Face Spaces!"} @app.get("/jobs") def all_jobs(): return list(JOB_QUEUE.values()) @app.get("/logs") def get_logs(limit: int = 50): return list(log_buffer)[-limit:] @app.get("/jobs/running") def running_jobs(): return [job for job in JOB_QUEUE.values() if job.status in ("pending", "running")] REPO_ID = "rahul7star/ohamlab" FOLDER = "demo" BASE_URL = f"https://huggingface.co/{REPO_ID}/resolve/main/" #show all images in a DIR at UI FE @app.get("/images") def list_images(): try: all_files = list_repo_files(REPO_ID) folder_prefix = FOLDER.rstrip("/") + "/" files_in_folder = [ f for f in all_files if f.startswith(folder_prefix) and "/" not in f[len(folder_prefix):] # no subfolder files and f.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) ] urls = [BASE_URL + f for f in files_in_folder] return {"images": urls} except Exception as e: return {"error": str(e)} from datetime import datetime import tempfile import uuid # upload zip from UI @app.post("/upload-zip") async def upload_zip(file: UploadFile = File(...)): if not file.filename.endswith(".zip"): return {"error": "Please upload a .zip file"} # Save the ZIP to /tmp temp_zip_path = f"/tmp/{file.filename}" with open(temp_zip_path, "wb") as f: f.write(await file.read()) # Create a unique subfolder name inside 'demo/' timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") unique_id = uuid.uuid4().hex[:6] folder_name = f"upload_{timestamp}_{unique_id}" hf_folder_prefix = f"demo/{folder_name}" try: with tempfile.TemporaryDirectory() as extract_dir: # Extract zip with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref: zip_ref.extractall(extract_dir) uploaded_files = [] # Upload all extracted files for root_dir, _, files in os.walk(extract_dir): for name in files: file_path = os.path.join(root_dir, name) relative_path = os.path.relpath(file_path, extract_dir) repo_path = f"{hf_folder_prefix}/{relative_path}".replace("\\", "/") upload_file( path_or_fileobj=file_path, path_in_repo=repo_path, repo_id="rahul7star/ohamlab", repo_type="model", commit_message=f"Upload {relative_path} to {folder_name}", token=True, ) uploaded_files.append(repo_path) return { "message": f"✅ Uploaded {len(uploaded_files)} files", "folder": folder_name, "files": uploaded_files, } except Exception as e: return {"error": f"❌ Failed to process zip: {str(e)}"} # upload a single file from UI from typing import List from fastapi import UploadFile, File, APIRouter import os from fastapi import UploadFile, File, APIRouter from typing import List from datetime import datetime import uuid, os @app.post("/upload") async def upload_images( background_tasks: BackgroundTasks, files: List[UploadFile] = File(...) ): # Step 1: Generate dynamic folder name and job ID timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") unique_id = uuid.uuid4().hex[:6] folder_name = f"upload_{timestamp}_{unique_id}" hf_folder_prefix = f"demo/{folder_name}" job_id = f"job_{unique_id}" # Register job JOB_QUEUE[job_id] = Job(job_id=job_id, folder_path=hf_folder_prefix) responses = [] # Step 2: Save and upload each image for file in files: filename = file.filename contents = await file.read() temp_path = f"/tmp/{filename}" with open(temp_path, "wb") as f: f.write(contents) try: upload_file( path_or_fileobj=temp_path, path_in_repo=f"{hf_folder_prefix}/{filename}", repo_id=T_REPO_ID, repo_type="model", commit_message=f"Upload {filename} to {hf_folder_prefix}", token=True, ) responses.append({ "filename": filename, "status": "✅ uploaded", "path": f"{hf_folder_prefix}/{filename}" }) except Exception as e: responses.append({ "filename": filename, "status": f"❌ failed: {str(e)}" }) os.remove(temp_path) # Step 3 & 4: Add background task with job ID def process_job(job_id: str): job = JOB_QUEUE.get(job_id) if not job: print(f"❌ Job {job_id} not found") return try: print(f"🔧 Starting filter for {job.folder_path}") filter_result = filter_and_rename_images(folder=job.folder_path) print(f"🧼 Filter result: {filter_result}") print(f"🚀 Triggering LoRA for {job.folder_path}") auto_run_lora_from_repo(folder_path=job.folder_path) job.status = "completed" except Exception as e: job.status = f"failed: {str(e)}" print(f"❌ Job {job_id} failed: {str(e)}") background_tasks.add_task(process_job, job_id) return { "message": f"{len(files)} file(s) uploaded", "job_id": job_id, "upload_folder": hf_folder_prefix, "results": responses, "note": "Filtering + LoRA training queued" } #Tranining Data set start fitering data for traninig T_REPO_ID = "rahul7star/ohamlab" DESCRIPTION_TEXT = ( "Ra3hul is wearing a black jacket over a striped white t-shirt with blue jeans. " "He is standing near a lake with his arms spread wide open, with mountains and cloudy skies in the background." ) def is_image_file(filename: str) -> bool: return filename.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) @app.post("/filter-images") def filter_and_rename_images(folder: str = Query("demo", description="Folder path in repo to scan")): try: all_files = list_repo_files(T_REPO_ID) folder_prefix = folder.rstrip("/") + "/" filter_folder = f"filter-{folder.rstrip('/')}" filter_prefix = filter_folder + "/" # Filter images only directly in the folder (no subfolders) image_files = [ f for f in all_files if f.startswith(folder_prefix) and "/" not in f[len(folder_prefix):] # no deeper path and is_image_file(f) ] if not image_files: return {"error": f"No images found in folder '{folder}'"} uploaded_files = [] for idx, orig_path in enumerate(image_files, start=1): # Download image content bytes (uses local cache) local_path = hf_hub_download(repo_id=T_REPO_ID, filename=orig_path) with open(local_path, "rb") as f: file_bytes = f.read() # Rename images as image1.jpeg, image2.jpeg, ... new_image_name = f"image{idx}.jpeg" # Upload renamed image from memory upload_file( path_or_fileobj=io.BytesIO(file_bytes), path_in_repo=filter_prefix + new_image_name, repo_id=T_REPO_ID, repo_type="model", commit_message=f"Upload renamed image {new_image_name} to {filter_folder}", token=True, ) uploaded_files.append(filter_prefix + new_image_name) # Create and upload text file for each image txt_filename = f"image{idx}.txt" upload_file( path_or_fileobj=io.BytesIO(DESCRIPTION_TEXT.encode("utf-8")), path_in_repo=filter_prefix + txt_filename, repo_id=T_REPO_ID, repo_type="model", commit_message=f"Upload text file {txt_filename} to {filter_folder}", token=True, ) uploaded_files.append(filter_prefix + txt_filename) return { "message": f"Processed and uploaded {len(image_files)} images and text files.", "files": uploaded_files, } except Exception as e: return {"error": str(e)} # Test call another space and send the payload @app.post("/webhook-trigger") def call_other_space(): try: payload = {"input": "Start training from external trigger"} res = requests.post( "https://rahul7star-ohamlab-ai-toolkit.hf.space/trigger", json=payload, timeout=30, ) # ✅ check if response has content and is JSON try: data = res.json() except ValueError: return { "error": f"Invalid JSON response. Status: {res.status_code}", "text": res.text } return data except Exception as e: return {"error": str(e)} # ========== TRAIN CONFIGURATION ========== ##checking model sample import os import uuid from pathlib import Path from huggingface_hub import hf_hub_download from fastapi.responses import JSONResponse from huggingface_hub import snapshot_download # Constants REPO_ID = "rahul7star/ohamlab" FOLDER_IN_REPO = "filter-demo/upload_20250708_041329_9c5c81" CONCEPT_SENTENCE = "ohamlab style" LORA_NAME = "ohami_filter_autorun" @app.get("/train-sample") def fetch_images_and_generate_captions(): # Create a unique local directory local_dir = Path(f"/tmp/{LORA_NAME}-{uuid.uuid4()}") os.makedirs(local_dir, exist_ok=True) # Download all files from the dataset repo snapshot_path = snapshot_download( repo_id=REPO_ID, repo_type="model", local_dir=local_dir, local_dir_use_symlinks=False, allow_patterns=[f"{FOLDER_IN_REPO}/*"], # only files inside the subfolder ) # Resolve image path relative to downloaded snapshot image_dir = Path(snapshot_path) / FOLDER_IN_REPO image_paths = list(image_dir.rglob("*.jpg")) + list(image_dir.rglob("*.jpeg")) + list(image_dir.rglob("*.png")) if not image_paths: return JSONResponse(status_code=400, content={"error": "No images found in the HF repo folder."}) captions = [ f"Autogenerated caption for {img.stem} in the {CONCEPT_SENTENCE} [trigger]" for img in image_paths ] return { "local_dir": str(image_dir), "images": [str(p) for p in image_paths], "captions": captions } REPO_ID = "rahul7star/ohamlab" FOLDER_IN_REPO = "filter-demo/upload_20250708_041329_9c5c81" CONCEPT_SENTENCE = "ohamlab style" LORA_NAME = "ohami_filter_autorun" # ========== FASTAPI APP ========== # ========== HELPERS ========== def create_dataset(images, *captions): if len(images) != len(captions): raise ValueError("Number of images and captions must be the same.") destination_folder = Path(f"/tmp/datasets_{uuid.uuid4()}") destination_folder.mkdir(parents=True, exist_ok=True) jsonl_file_path = destination_folder / "metadata.jsonl" with jsonl_file_path.open("a", encoding="utf-8") as jsonl_file: for image_path, caption in zip(images, captions): new_image_path = shutil.copy(str(image_path), destination_folder) file_name = Path(new_image_path).name entry = {"file_name": file_name, "prompt": caption} jsonl_file.write(json.dumps(entry, ensure_ascii=False) + "\n") return str(destination_folder) def recursive_update(d, u): for k, v in u.items(): if isinstance(v, dict) and v: d[k] = recursive_update(d.get(k, {}), v) else: d[k] = v return d @app.post("/train-from-hf") def auto_run_lora_from_repo(folder_path: str): try: print("Training has kickstarted") # ✅ Static or dynamic config REPO_ID = "rahul7star/ohamlab" FOLDER_IN_REPO = folder_path CONCEPT_SENTENCE = "ohamlab style" LORA_NAME = "ohami_filter_autorun" # ✅ Setup HF cache os.environ["HF_HOME"] = "/tmp/hf_cache" os.makedirs("/tmp/hf_cache", exist_ok=True) # ✅ Download dataset from HF local_dir = Path(f"/tmp/{LORA_NAME}-{uuid.uuid4()}") os.makedirs(local_dir, exist_ok=True) snapshot_path = snapshot_download( repo_id=REPO_ID, repo_type="model", local_dir=local_dir, local_dir_use_symlinks=False, allow_patterns=[f"{FOLDER_IN_REPO}/*"], # only files inside the subfolder ) image_dir = local_dir / FOLDER_IN_REPO image_paths = list(image_dir.rglob("*.jpg")) + list(image_dir.rglob("*.jpeg")) + list(image_dir.rglob("*.png")) if not image_paths: raise HTTPException(status_code=400, detail="No images found in the Hugging Face folder.") # ✅ Auto-generate captions captions = [ f"Autogenerated caption for {img.stem} in the {CONCEPT_SENTENCE} [trigger]" for img in image_paths ] # ✅ Create dataset folder with metadata.jsonl dataset_folder = os.path.join("/tmp", f"datasets_{uuid.uuid4()}") os.makedirs(dataset_folder, exist_ok=True) print('DATA SET iS CREATED =================================================') jsonl_file_path = os.path.join(dataset_folder, "metadata.jsonl") with open(jsonl_file_path, "a") as jsonl_file: for index, image in enumerate(image_paths): new_image_path = shutil.copy(str(image), dataset_folder) file_name = os.path.basename(new_image_path) data = {"file_name": file_name, "prompt": captions[index]} jsonl_file.write(json.dumps(data) + "\n") # ✅ Optional advanced config slugged_lora_name = LORA_NAME.replace(" ", "_") os.makedirs("/tmp/tmp_configs", exist_ok=True) config_path = f"/tmp/tmp_configs/{uuid.uuid4()}_{slugged_lora_name}.yaml" config = { "sample_1": "a stylish anime character with ohamlab style", "sample_2": "a cartoon car in ohamlab style", "sample_3": "portrait in ohamlab lighting" } with open(config_path, "w") as f: yaml.dump(config, f) # ✅ Final call to train print(f" slugged_lora{ slugged_lora_name}") print('Now Start Trainng Set called all data si rADYU =================================================') result = start_training( lora_name=LORA_NAME, concept_sentence=CONCEPT_SENTENCE, steps=45, lr=1e-4, rank=32, model_to_train="flux", low_vram=True, dataset_folder=dataset_folder, sample_1=config["sample_1"], sample_2=config["sample_2"], sample_3=config["sample_3"], use_more_advanced_options=True, more_advanced_options=config_path ) return JSONResponse(content={"status": "success", "message": result}) except Exception as e: raise HTTPException(status_code=500, detail=str(e))