File size: 2,934 Bytes
a9ef680
 
 
437062b
a9ef680
 
a62dc8a
bca805f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a8df29c
2414776
a9ef680
a659d08
a9ef680
a21a8cf
 
a62dc8a
 
 
 
 
 
 
 
 
 
 
 
 
 
0f5ffee
2414776
b1c1ca8
437062b
 
2414776
 
 
437062b
a62dc8a
 
 
 
 
 
 
 
 
 
 
 
437062b
a62dc8a
 
 
0f5ffee
a62dc8a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0f5ffee
bca805f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import streamlit as st
import requests
import io
import random
from PIL import Image
import os
import json
import time

# Restart mechanism: Check if the application has been running for more than 48 hours.
start_time_file = "start_time.txt"
if not os.path.exists(start_time_file):
    # Create the file and write the current time (in seconds since epoch)
    with open(start_time_file, "w") as f:
        f.write(str(time.time()))
else:
    try:
        with open(start_time_file, "r") as f:
            start_time = float(f.read())
        # Check if more than 48 hours have passed
        if time.time() - start_time > 48 * 3600:
            st.write("Restarting the space after 48 hours of uptime...")
            os._exit(0)  # Force a restart of the space
    except Exception as e:
        # If there's an issue reading the file, reset the start time.
        with open(start_time_file, "w") as f:
            f.write(str(time.time()))

# API Configuration
API_URL = "https://api-inference.huggingface.co/models/black-forest-labs/FLUX.1-dev"
headers = {"Authorization": f"Bearer {os.getenv('HF')}"}

def query(payload):
    response = requests.post(API_URL, headers=headers, json=payload)
    
    # Check for API errors
    if response.status_code != 200:
        try:
            error = response.json()
            return {"error": error.get("error", f"API Error: {response.status_code}")}
        except:
            return {"error": f"API Error: {response.status_code} - {response.text}"}
    
    # Check if response is actually an image
    if 'image' not in response.headers.get('Content-Type', ''):
        return {"error": "Unexpected non-image response from API"}
    
    return {"image": response.content}

def generate_image(prompt):
    random_seed = random.randint(0, 4294967295)
    payload = {
        "inputs": prompt,
        "parameters": {
            "seed": random_seed
        }
    }
    
    result = query(payload)
    
    if "error" in result:
        st.error(f"API Error: {result['error']}")
        return None
    
    try:
        return Image.open(io.BytesIO(result["image"]))
    except Exception as e:
        st.error(f"Failed to process image: {str(e)}")
        return None

# Check for 'text' parameter in URL
query_params = st.query_params
prompt_from_url = query_params.get('text')

if prompt_from_url:
    image = generate_image(prompt_from_url)
    if image:
        st.image(image, caption="Generated Image", use_container_width=True)
        
        # Provide download link
        img_buffer = io.BytesIO()
        image.save(img_buffer, format="PNG")
        img_buffer.seek(0)
        st.download_button(
            label="Download Image 📥",
            data=img_buffer,
            file_name="generated_image.png",
            mime="image/png"
        )
else:
    st.info("Add a 'text' parameter to the URL to generate an image. Example: ?text=astronaut riding a horse")