|
import os |
|
import pathlib |
|
import random |
|
import string |
|
import tempfile |
|
import time |
|
from concurrent.futures import ThreadPoolExecutor |
|
from typing import Iterable, List |
|
|
|
import gradio as gr |
|
import huggingface_hub |
|
import torch |
|
import yaml |
|
from gradio_logsview.logsview import Log, LogsView, LogsViewRunner |
|
from mergekit.config import MergeConfiguration |
|
from clean_community_org import garbage_collect_empty_models |
|
|
|
has_gpu = torch.cuda.is_available() |
|
|
|
cli = "mergekit-yaml config.yaml merge --copy-tokenizer" + ( |
|
" --cuda --low-cpu-memory --allow-crimes" if has_gpu else " --allow-crimes --out-shard-size 1B --lazy-unpickle" |
|
) |
|
|
|
MARKDOWN_DESCRIPTION = """ |
|
# mergekit-gui |
|
The fastest way to perform a model merge 🔥 |
|
Specify a YAML configuration file (see examples below) and a HF token and this app will perform the merge and upload the merged model to your user profile. |
|
""" |
|
|
|
examples = [[str(f)] for f in pathlib.Path("examples").glob("*.yaml")] |
|
COMMUNITY_HF_TOKEN = os.getenv("COMMUNITY_HF_TOKEN") |
|
|
|
def merge_multiple_methods(yaml_config: str, hf_token: str, repo_name: str, profile_name: str) -> Iterable[List[Log]]: |
|
runner = LogsViewRunner() |
|
|
|
if not yaml_config: |
|
yield runner.log("Empty yaml, pick an example below", level="ERROR") |
|
return |
|
|
|
try: |
|
merge_config = MergeConfiguration.model_validate(yaml.safe_load(yaml_config)) |
|
except Exception as e: |
|
yield runner.log(f"Invalid yaml {e}", level="ERROR") |
|
return |
|
|
|
methods_to_merge = ['dare_ties', 'slerp', 'ties'] |
|
current_yaml_config = yaml_config |
|
merged_model_path = None |
|
|
|
for method in methods_to_merge: |
|
yield from run_merge_for_method(method, current_yaml_config, hf_token, repo_name, profile_name, runner) |
|
current_yaml_config = get_merged_yaml(current_yaml_config, method) |
|
yield runner.log(f"Model merged with {method}. Proceeding to next method...") |
|
|
|
merged_model_path = "final_merged_model" |
|
|
|
if merged_model_path: |
|
yield runner.log(f"Model successfully merged using all methods. Saving unified model to {merged_model_path}") |
|
|
|
example_yaml = generate_example_yaml(methods_to_merge) |
|
yield runner.log(f"Generated example YAML: {example_yaml}") |
|
|
|
|
|
|
|
|
|
def get_merged_yaml(original_yaml: str, method: str) -> str: |
|
yaml_data = yaml.safe_load(original_yaml) |
|
yaml_data['merge_method'] = method |
|
return yaml.dump(yaml_data) |
|
|
|
def run_merge_for_method(method: str, yaml_config: str, hf_token: str, repo_name: str, profile_name: str, runner: LogsViewRunner): |
|
yaml_data = yaml.safe_load(yaml_config) |
|
yaml_data['merge_method'] = method |
|
new_yaml_config = yaml.dump(yaml_data) |
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdirname: |
|
tmpdir = pathlib.Path(tmpdirname) |
|
merged_path = tmpdir / "merged" |
|
merged_path.mkdir(parents=True, exist_ok=True) |
|
config_path = merged_path / "config.yaml" |
|
config_path.write_text(new_yaml_config) |
|
yield runner.log(f"Merge configuration saved for {method} in {config_path}") |
|
|
|
if not repo_name: |
|
repo_name = f"{profile_name}/mergekit-{method}" if profile_name else f"mergekit-{method}" |
|
repo_name += "-" + "".join(random.choices(string.ascii_lowercase, k=7)) |
|
repo_name = repo_name.replace("/", "-").strip("-") |
|
|
|
try: |
|
yield runner.log(f"Creating repo for {method} {repo_name}") |
|
repo_url = huggingface_hub.HfApi(token=hf_token).create_repo(repo_name, exist_ok=True) |
|
yield runner.log(f"Repo created for {method}: {repo_url}") |
|
except Exception as e: |
|
yield runner.log(f"Error creating repo for {method}: {e}", level="ERROR") |
|
return |
|
|
|
tmp_env = os.environ.copy() |
|
tmp_env["HF_HOME"] = f"{tmpdirname}/.cache" |
|
full_cli = cli + f" --lora-merge-cache {tmpdirname}/.lora_cache" |
|
yield from runner.run_command(full_cli.split(), cwd=merged_path, env=tmp_env) |
|
|
|
if runner.exit_code != 0: |
|
yield runner.log(f"Merge for {method} failed. Deleting repo as no model is uploaded.", level="ERROR") |
|
huggingface_hub.HfApi(token=hf_token).delete_repo(repo_url.repo_id) |
|
return |
|
|
|
yield runner.log(f"Model merged with {method}. Uploading to HF.") |
|
yield from runner.run_python( |
|
huggingface_hub.HfApi(token=hf_token).upload_folder, |
|
repo_id=repo_url.repo_id, |
|
folder_path=merged_path / "merge", |
|
) |
|
yield runner.log(f"Model successfully uploaded to HF with {method}: {repo_url.repo_id}") |
|
|
|
def generate_example_yaml(methods: List[str]) -> str: |
|
"""Genera un archivo YAML de ejemplo que refleja la secuencia de métodos de fusión aplicados""" |
|
example_yaml = { |
|
'merge_method': 'linear', |
|
'models': ['model1', 'model2', 'model3'], |
|
'slices': None, |
|
'parameters': { |
|
'normalize': False, |
|
'weight': 0.5 |
|
}, |
|
'tokenizer_source': 'union', |
|
} |
|
|
|
example_yaml['merge_method_sequence'] = methods |
|
|
|
return yaml.dump(example_yaml) |
|
|
|
def merge(yaml_config: str, hf_token: str, repo_name: str, profile_name: str) -> Iterable[List[Log]]: |
|
runner = LogsViewRunner() |
|
|
|
if not yaml_config: |
|
yield runner.log("Empty yaml, pick an example below", level="ERROR") |
|
return |
|
|
|
try: |
|
merge_config = MergeConfiguration.model_validate(yaml.safe_load(yaml_config)) |
|
except Exception as e: |
|
yield runner.log(f"Invalid yaml {e}", level="ERROR") |
|
return |
|
|
|
yield from merge_multiple_methods(yaml_config, hf_token, repo_name, profile_name) |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown(MARKDOWN_DESCRIPTION) |
|
|
|
with gr.Row(): |
|
filename = gr.Textbox(visible=False, label="filename") |
|
config = gr.Code(language="yaml", lines=10, label="config.yaml") |
|
with gr.Column(): |
|
token = gr.Textbox( |
|
lines=1, |
|
label="HF Write Token", |
|
info="https://hf.co/settings/token", |
|
type="password", |
|
placeholder="Optional. Will upload merged model to MergeKit Community if empty.", |
|
) |
|
repo_name = gr.Textbox( |
|
lines=1, |
|
label="Repo name", |
|
placeholder="Optional. Will create a random name if empty.", |
|
) |
|
profile_name = gr.Textbox( |
|
lines=1, |
|
label="Hugging Face Profile Name", |
|
placeholder="Enter your Hugging Face profile name.", |
|
) |
|
button = gr.Button("Merge", variant="primary") |
|
logs = LogsView(label="Terminal output") |
|
gr.Examples( |
|
examples, |
|
fn=lambda s: (s,), |
|
run_on_click=True, |
|
label="Examples", |
|
inputs=[filename], |
|
outputs=[config], |
|
) |
|
gr.Markdown(MARKDOWN_ARTICLE) |
|
|
|
button.click(fn=merge, inputs=[config, token, repo_name, profile_name], outputs=[logs]) |
|
|
|
def _garbage_collect_every_hour(): |
|
while True: |
|
try: |
|
garbage_collect_empty_models(token=COMMUNITY_HF_TOKEN) |
|
except Exception as e: |
|
print("Error running garbage collection", e) |
|
time.sleep(3600) |
|
|
|
pool = ThreadPoolExecutor() |
|
pool.submit(_garbage_collect_every_hour) |
|
|
|
demo.queue(default_concurrency_limit=2).launch() |
|
|