Spaces:
Running
Running
import os | |
os.environ["HF_HOME"] = "/tmp/hf_cache" | |
import sys | |
sys.path.append(os.path.dirname(os.path.abspath(__file__))) | |
os.makedirs("/tmp/hf_cache", exist_ok=True) | |
from huggingface_hub import whoami | |
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" | |
from fastapi import HTTPException | |
import spaces | |
from fastapi import FastAPI, Query | |
from huggingface_hub import list_repo_files, hf_hub_download, upload_file | |
import io | |
import requests | |
import logging | |
from fastapi import BackgroundTasks | |
from fastapi import FastAPI, UploadFile, File | |
from fastapi.middleware.cors import CORSMiddleware | |
from pathlib import Path | |
from pathlib import Path | |
from flux_train_ui import start_training | |
import uuid | |
import shutil | |
import json | |
import os | |
import os | |
import os | |
import zipfile | |
import tempfile # ✅ Add this! | |
import yaml | |
sys.path.insert(0, os.getcwd()) | |
import gradio as gr | |
from PIL import Image | |
import torch | |
import uuid | |
import os | |
import shutil | |
import json | |
import yaml | |
from slugify import slugify | |
from collections import deque | |
log_buffer = deque(maxlen=1000) | |
class InMemoryLogHandler(logging.Handler): | |
def emit(self, record): | |
msg = self.format(record) | |
log_buffer.append(msg) | |
# Setup logging to buffer | |
handler = InMemoryLogHandler() | |
handler.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")) | |
logging.getLogger().addHandler(handler) | |
sys.path.insert(0, "ai-toolkit") | |
from toolkit.job import get_job | |
from dataclasses import dataclass, field | |
from typing import Dict | |
import uuid | |
from datetime import datetime | |
class Job: | |
job_id: str | |
folder_path: str | |
status: str = "pending" | |
created_at: str = field(default_factory=lambda: datetime.utcnow().isoformat()) | |
# Global job queue (in-memory) | |
JOB_QUEUE: Dict[str, Job] = {} | |
app = FastAPI() | |
# CORS setup to allow requests from your frontend | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # Replace "*" with your frontend domain in production | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
def health_check(): | |
return {"status": "✅ FastAPI running on Hugging Face Spaces!"} | |
def all_jobs(): | |
return list(JOB_QUEUE.values()) | |
def get_logs(limit: int = 50): | |
return list(log_buffer)[-limit:] | |
def running_jobs(): | |
return [job for job in JOB_QUEUE.values() if job.status in ("pending", "running")] | |
REPO_ID = "rahul7star/ohamlab" | |
FOLDER = "demo" | |
BASE_URL = f"https://huggingface.co/{REPO_ID}/resolve/main/" | |
#show all images in a DIR at UI FE | |
def list_images(): | |
try: | |
all_files = list_repo_files(REPO_ID) | |
folder_prefix = FOLDER.rstrip("/") + "/" | |
files_in_folder = [ | |
f for f in all_files | |
if f.startswith(folder_prefix) | |
and "/" not in f[len(folder_prefix):] # no subfolder files | |
and f.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) | |
] | |
urls = [BASE_URL + f for f in files_in_folder] | |
return {"images": urls} | |
except Exception as e: | |
return {"error": str(e)} | |
from datetime import datetime | |
import tempfile | |
import uuid | |
# upload zip from UI | |
async def upload_zip(file: UploadFile = File(...)): | |
if not file.filename.endswith(".zip"): | |
return {"error": "Please upload a .zip file"} | |
# Save the ZIP to /tmp | |
temp_zip_path = f"/tmp/{file.filename}" | |
with open(temp_zip_path, "wb") as f: | |
f.write(await file.read()) | |
# Create a unique subfolder name inside 'demo/' | |
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
unique_id = uuid.uuid4().hex[:6] | |
folder_name = f"upload_{timestamp}_{unique_id}" | |
hf_folder_prefix = f"demo/{folder_name}" | |
try: | |
with tempfile.TemporaryDirectory() as extract_dir: | |
# Extract zip | |
with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref: | |
zip_ref.extractall(extract_dir) | |
uploaded_files = [] | |
# Upload all extracted files | |
for root_dir, _, files in os.walk(extract_dir): | |
for name in files: | |
file_path = os.path.join(root_dir, name) | |
relative_path = os.path.relpath(file_path, extract_dir) | |
repo_path = f"{hf_folder_prefix}/{relative_path}".replace("\\", "/") | |
upload_file( | |
path_or_fileobj=file_path, | |
path_in_repo=repo_path, | |
repo_id="rahul7star/ohamlab", | |
repo_type="model", | |
commit_message=f"Upload {relative_path} to {folder_name}", | |
token=True, | |
) | |
uploaded_files.append(repo_path) | |
return { | |
"message": f"✅ Uploaded {len(uploaded_files)} files", | |
"folder": folder_name, | |
"files": uploaded_files, | |
} | |
except Exception as e: | |
return {"error": f"❌ Failed to process zip: {str(e)}"} | |
# upload a single file from UI | |
from typing import List | |
from fastapi import UploadFile, File, APIRouter | |
import os | |
from fastapi import UploadFile, File, APIRouter | |
from typing import List | |
from datetime import datetime | |
import uuid, os | |
async def upload_images( | |
background_tasks: BackgroundTasks, | |
files: List[UploadFile] = File(...) | |
): | |
# Step 1: Generate dynamic folder name and job ID | |
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
unique_id = uuid.uuid4().hex[:6] | |
folder_name = f"upload_{timestamp}_{unique_id}" | |
hf_folder_prefix = f"demo/{folder_name}" | |
job_id = f"job_{unique_id}" | |
# Register job | |
JOB_QUEUE[job_id] = Job(job_id=job_id, folder_path=hf_folder_prefix) | |
responses = [] | |
# Step 2: Save and upload each image | |
for file in files: | |
filename = file.filename | |
contents = await file.read() | |
temp_path = f"/tmp/{filename}" | |
with open(temp_path, "wb") as f: | |
f.write(contents) | |
try: | |
upload_file( | |
path_or_fileobj=temp_path, | |
path_in_repo=f"{hf_folder_prefix}/{filename}", | |
repo_id=T_REPO_ID, | |
repo_type="model", | |
commit_message=f"Upload {filename} to {hf_folder_prefix}", | |
token=True, | |
) | |
responses.append({ | |
"filename": filename, | |
"status": "✅ uploaded", | |
"path": f"{hf_folder_prefix}/{filename}" | |
}) | |
except Exception as e: | |
responses.append({ | |
"filename": filename, | |
"status": f"❌ failed: {str(e)}" | |
}) | |
os.remove(temp_path) | |
# Step 3 & 4: Add background task with job ID | |
def process_job(job_id: str): | |
job = JOB_QUEUE.get(job_id) | |
if not job: | |
print(f"❌ Job {job_id} not found") | |
return | |
try: | |
print(f"🔧 Starting filter for {job.folder_path}") | |
filter_result = filter_and_rename_images(folder=job.folder_path) | |
print(f"🧼 Filter result: {filter_result}") | |
print(f"🚀 Triggering LoRA for {job.folder_path}") | |
auto_run_lora_from_repo(folder_path=job.folder_path) | |
job.status = "completed" | |
except Exception as e: | |
job.status = f"failed: {str(e)}" | |
print(f"❌ Job {job_id} failed: {str(e)}") | |
background_tasks.add_task(process_job, job_id) | |
return { | |
"message": f"{len(files)} file(s) uploaded", | |
"job_id": job_id, | |
"upload_folder": hf_folder_prefix, | |
"results": responses, | |
"note": "Filtering + LoRA training queued" | |
} | |
#Tranining Data set start fitering data for traninig | |
T_REPO_ID = "rahul7star/ohamlab" | |
DESCRIPTION_TEXT = ( | |
"Ra3hul is wearing a black jacket over a striped white t-shirt with blue jeans. " | |
"He is standing near a lake with his arms spread wide open, with mountains and cloudy skies in the background." | |
) | |
def is_image_file(filename: str) -> bool: | |
return filename.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) | |
def filter_and_rename_images(folder: str = Query("demo", description="Folder path in repo to scan")): | |
try: | |
all_files = list_repo_files(T_REPO_ID) | |
folder_prefix = folder.rstrip("/") + "/" | |
filter_folder = f"filter-{folder.rstrip('/')}" | |
filter_prefix = filter_folder + "/" | |
# Filter images only directly in the folder (no subfolders) | |
image_files = [ | |
f for f in all_files | |
if f.startswith(folder_prefix) | |
and "/" not in f[len(folder_prefix):] # no deeper path | |
and is_image_file(f) | |
] | |
if not image_files: | |
return {"error": f"No images found in folder '{folder}'"} | |
uploaded_files = [] | |
for idx, orig_path in enumerate(image_files, start=1): | |
# Download image content bytes (uses local cache) | |
local_path = hf_hub_download(repo_id=T_REPO_ID, filename=orig_path) | |
with open(local_path, "rb") as f: | |
file_bytes = f.read() | |
# Rename images as image1.jpeg, image2.jpeg, ... | |
new_image_name = f"image{idx}.jpeg" | |
# Upload renamed image from memory | |
upload_file( | |
path_or_fileobj=io.BytesIO(file_bytes), | |
path_in_repo=filter_prefix + new_image_name, | |
repo_id=T_REPO_ID, | |
repo_type="model", | |
commit_message=f"Upload renamed image {new_image_name} to {filter_folder}", | |
token=True, | |
) | |
uploaded_files.append(filter_prefix + new_image_name) | |
# Create and upload text file for each image | |
txt_filename = f"image{idx}.txt" | |
upload_file( | |
path_or_fileobj=io.BytesIO(DESCRIPTION_TEXT.encode("utf-8")), | |
path_in_repo=filter_prefix + txt_filename, | |
repo_id=T_REPO_ID, | |
repo_type="model", | |
commit_message=f"Upload text file {txt_filename} to {filter_folder}", | |
token=True, | |
) | |
uploaded_files.append(filter_prefix + txt_filename) | |
return { | |
"message": f"Processed and uploaded {len(image_files)} images and text files.", | |
"files": uploaded_files, | |
} | |
except Exception as e: | |
return {"error": str(e)} | |
# Test call another space and send the payload | |
def call_other_space(): | |
try: | |
payload = {"input": "Start training from external trigger"} | |
res = requests.post( | |
"https://rahul7star-ohamlab-ai-toolkit.hf.space/trigger", | |
json=payload, | |
timeout=30, | |
) | |
# ✅ check if response has content and is JSON | |
try: | |
data = res.json() | |
except ValueError: | |
return { | |
"error": f"Invalid JSON response. Status: {res.status_code}", | |
"text": res.text | |
} | |
return data | |
except Exception as e: | |
return {"error": str(e)} | |
# ========== TRAIN CONFIGURATION ========== | |
##checking model sample | |
import os | |
import uuid | |
from pathlib import Path | |
from huggingface_hub import hf_hub_download | |
from fastapi.responses import JSONResponse | |
from huggingface_hub import snapshot_download | |
# Constants | |
REPO_ID = "rahul7star/ohamlab" | |
FOLDER_IN_REPO = "filter-demo/upload_20250708_041329_9c5c81" | |
CONCEPT_SENTENCE = "ohamlab style" | |
LORA_NAME = "ohami_filter_autorun" | |
def fetch_images_and_generate_captions(): | |
# Create a unique local directory | |
local_dir = Path(f"/tmp/{LORA_NAME}-{uuid.uuid4()}") | |
os.makedirs(local_dir, exist_ok=True) | |
# Download all files from the dataset repo | |
snapshot_path = snapshot_download( | |
repo_id=REPO_ID, | |
repo_type="model", | |
local_dir=local_dir, | |
local_dir_use_symlinks=False, | |
allow_patterns=[f"{FOLDER_IN_REPO}/*"], # only files inside the subfolder | |
) | |
# Resolve image path relative to downloaded snapshot | |
image_dir = Path(snapshot_path) / FOLDER_IN_REPO | |
image_paths = list(image_dir.rglob("*.jpg")) + list(image_dir.rglob("*.jpeg")) + list(image_dir.rglob("*.png")) | |
if not image_paths: | |
return JSONResponse(status_code=400, content={"error": "No images found in the HF repo folder."}) | |
captions = [ | |
f"Autogenerated caption for {img.stem} in the {CONCEPT_SENTENCE} [trigger]" for img in image_paths | |
] | |
return { | |
"local_dir": str(image_dir), | |
"images": [str(p) for p in image_paths], | |
"captions": captions | |
} | |
REPO_ID = "rahul7star/ohamlab" | |
FOLDER_IN_REPO = "filter-demo/upload_20250708_041329_9c5c81" | |
CONCEPT_SENTENCE = "ohamlab style" | |
LORA_NAME = "ohami_filter_autorun" | |
# ========== FASTAPI APP ========== | |
# ========== HELPERS ========== | |
def create_dataset(images, *captions): | |
if len(images) != len(captions): | |
raise ValueError("Number of images and captions must be the same.") | |
destination_folder = Path(f"/tmp/datasets_{uuid.uuid4()}") | |
destination_folder.mkdir(parents=True, exist_ok=True) | |
jsonl_file_path = destination_folder / "metadata.jsonl" | |
with jsonl_file_path.open("a", encoding="utf-8") as jsonl_file: | |
for image_path, caption in zip(images, captions): | |
new_image_path = shutil.copy(str(image_path), destination_folder) | |
file_name = Path(new_image_path).name | |
entry = {"file_name": file_name, "prompt": caption} | |
jsonl_file.write(json.dumps(entry, ensure_ascii=False) + "\n") | |
return str(destination_folder) | |
def recursive_update(d, u): | |
for k, v in u.items(): | |
if isinstance(v, dict) and v: | |
d[k] = recursive_update(d.get(k, {}), v) | |
else: | |
d[k] = v | |
return d | |
def auto_run_lora_from_repo(folder_path: str): | |
try: | |
print("Training has kickstarted") | |
# ✅ Static or dynamic config | |
REPO_ID = "rahul7star/ohamlab" | |
FOLDER_IN_REPO = folder_path | |
CONCEPT_SENTENCE = "ohamlab style" | |
LORA_NAME = "ohami_filter_autorun" | |
# ✅ Setup HF cache | |
os.environ["HF_HOME"] = "/tmp/hf_cache" | |
os.makedirs("/tmp/hf_cache", exist_ok=True) | |
# ✅ Download dataset from HF | |
local_dir = Path(f"/tmp/{LORA_NAME}-{uuid.uuid4()}") | |
os.makedirs(local_dir, exist_ok=True) | |
snapshot_path = snapshot_download( | |
repo_id=REPO_ID, | |
repo_type="model", | |
local_dir=local_dir, | |
local_dir_use_symlinks=False, | |
allow_patterns=[f"{FOLDER_IN_REPO}/*"], # only files inside the subfolder | |
) | |
image_dir = local_dir / FOLDER_IN_REPO | |
image_paths = list(image_dir.rglob("*.jpg")) + list(image_dir.rglob("*.jpeg")) + list(image_dir.rglob("*.png")) | |
if not image_paths: | |
raise HTTPException(status_code=400, detail="No images found in the Hugging Face folder.") | |
# ✅ Auto-generate captions | |
captions = [ | |
f"Autogenerated caption for {img.stem} in the {CONCEPT_SENTENCE} [trigger]" for img in image_paths | |
] | |
# ✅ Create dataset folder with metadata.jsonl | |
dataset_folder = os.path.join("/tmp", f"datasets_{uuid.uuid4()}") | |
os.makedirs(dataset_folder, exist_ok=True) | |
print('DATA SET iS CREATED =================================================') | |
jsonl_file_path = os.path.join(dataset_folder, "metadata.jsonl") | |
with open(jsonl_file_path, "a") as jsonl_file: | |
for index, image in enumerate(image_paths): | |
new_image_path = shutil.copy(str(image), dataset_folder) | |
file_name = os.path.basename(new_image_path) | |
data = {"file_name": file_name, "prompt": captions[index]} | |
jsonl_file.write(json.dumps(data) + "\n") | |
# ✅ Optional advanced config | |
slugged_lora_name = LORA_NAME.replace(" ", "_") | |
os.makedirs("/tmp/tmp_configs", exist_ok=True) | |
config_path = f"/tmp/tmp_configs/{uuid.uuid4()}_{slugged_lora_name}.yaml" | |
config = { | |
"sample_1": "a stylish anime character with ohamlab style", | |
"sample_2": "a cartoon car in ohamlab style", | |
"sample_3": "portrait in ohamlab lighting" | |
} | |
with open(config_path, "w") as f: | |
yaml.dump(config, f) | |
# ✅ Final call to train | |
print(f" slugged_lora{ slugged_lora_name}") | |
print('Now Start Trainng Set called all data si rADYU =================================================') | |
result = start_training( | |
lora_name=LORA_NAME, | |
concept_sentence=CONCEPT_SENTENCE, | |
steps=45, | |
lr=1e-4, | |
rank=32, | |
model_to_train="flux", | |
low_vram=True, | |
dataset_folder=dataset_folder, | |
sample_1=config["sample_1"], | |
sample_2=config["sample_2"], | |
sample_3=config["sample_3"], | |
use_more_advanced_options=True, | |
more_advanced_options=config_path | |
) | |
return JSONResponse(content={"status": "success", "message": result}) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |