import os import pathlib import random import string import tempfile import time from concurrent.futures import ThreadPoolExecutor from typing import Iterable, List import gradio as gr import huggingface_hub import torch import yaml from gradio_logsview.logsview import Log, LogsView, LogsViewRunner from mergekit.config import MergeConfiguration from clean_community_org import garbage_collect_empty_models has_gpu = torch.cuda.is_available() cli = "mergekit-yaml config.yaml merge --copy-tokenizer" + ( " --cuda --low-cpu-memory --allow-crimes" if has_gpu else " --allow-crimes --out-shard-size 1B --lazy-unpickle" ) MARKDOWN_DESCRIPTION = """ # mergekit-gui The fastest way to perform a model merge 🔥 Specify a YAML configuration file (see examples below) and a HF token and this app will perform the merge and upload the merged model to your user profile. """ examples = [[str(f)] for f in pathlib.Path("examples").glob("*.yaml")] COMMUNITY_HF_TOKEN = os.getenv("COMMUNITY_HF_TOKEN") def merge_multiple_methods(yaml_config: str, hf_token: str, repo_name: str, profile_name: str) -> Iterable[List[Log]]: runner = LogsViewRunner() if not yaml_config: yield runner.log("Empty yaml, pick an example below", level="ERROR") return try: merge_config = MergeConfiguration.model_validate(yaml.safe_load(yaml_config)) except Exception as e: yield runner.log(f"Invalid yaml {e}", level="ERROR") return methods_to_merge = ['dare_ties', 'slerp', 'ties'] current_yaml_config = yaml_config merged_model_path = None for method in methods_to_merge: yield from run_merge_for_method(method, current_yaml_config, hf_token, repo_name, profile_name, runner) current_yaml_config = get_merged_yaml(current_yaml_config, method) yield runner.log(f"Model merged with {method}. Proceeding to next method...") merged_model_path = "final_merged_model" # Placeholder, adjust based on your process if merged_model_path: yield runner.log(f"Model successfully merged using all methods. Saving unified model to {merged_model_path}") # Save final YAML example_yaml = generate_example_yaml(methods_to_merge) yield runner.log(f"Generated example YAML: {example_yaml}") # Here, you could potentially upload the final merged model # Upload logic goes here if needed def get_merged_yaml(original_yaml: str, method: str) -> str: yaml_data = yaml.safe_load(original_yaml) yaml_data['merge_method'] = method return yaml.dump(yaml_data) def run_merge_for_method(method: str, yaml_config: str, hf_token: str, repo_name: str, profile_name: str, runner: LogsViewRunner): yaml_data = yaml.safe_load(yaml_config) yaml_data['merge_method'] = method new_yaml_config = yaml.dump(yaml_data) with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdirname: tmpdir = pathlib.Path(tmpdirname) merged_path = tmpdir / "merged" merged_path.mkdir(parents=True, exist_ok=True) config_path = merged_path / "config.yaml" config_path.write_text(new_yaml_config) yield runner.log(f"Merge configuration saved for {method} in {config_path}") if not repo_name: repo_name = f"{profile_name}/mergekit-{method}" if profile_name else f"mergekit-{method}" repo_name += "-" + "".join(random.choices(string.ascii_lowercase, k=7)) repo_name = repo_name.replace("/", "-").strip("-") try: yield runner.log(f"Creating repo for {method} {repo_name}") repo_url = huggingface_hub.HfApi(token=hf_token).create_repo(repo_name, exist_ok=True) yield runner.log(f"Repo created for {method}: {repo_url}") except Exception as e: yield runner.log(f"Error creating repo for {method}: {e}", level="ERROR") return tmp_env = os.environ.copy() tmp_env["HF_HOME"] = f"{tmpdirname}/.cache" full_cli = cli + f" --lora-merge-cache {tmpdirname}/.lora_cache" yield from runner.run_command(full_cli.split(), cwd=merged_path, env=tmp_env) if runner.exit_code != 0: yield runner.log(f"Merge for {method} failed. Deleting repo as no model is uploaded.", level="ERROR") huggingface_hub.HfApi(token=hf_token).delete_repo(repo_url.repo_id) return yield runner.log(f"Model merged with {method}. Uploading to HF.") yield from runner.run_python( huggingface_hub.HfApi(token=hf_token).upload_folder, repo_id=repo_url.repo_id, folder_path=merged_path / "merge", ) yield runner.log(f"Model successfully uploaded to HF with {method}: {repo_url.repo_id}") def generate_example_yaml(methods: List[str]) -> str: """Genera un archivo YAML de ejemplo que refleja la secuencia de métodos de fusión aplicados""" example_yaml = { 'merge_method': 'linear', # O el método final que decidas usar 'models': ['model1', 'model2', 'model3'], # Ejemplo de modelos a fusionar 'slices': None, # Puedes agregar slices si es necesario 'parameters': { 'normalize': False, 'weight': 0.5 }, 'tokenizer_source': 'union', # Definir el tokenizer } # Agregar los métodos de fusión aplicados example_yaml['merge_method_sequence'] = methods return yaml.dump(example_yaml) def merge(yaml_config: str, hf_token: str, repo_name: str, profile_name: str) -> Iterable[List[Log]]: runner = LogsViewRunner() if not yaml_config: yield runner.log("Empty yaml, pick an example below", level="ERROR") return try: merge_config = MergeConfiguration.model_validate(yaml.safe_load(yaml_config)) except Exception as e: yield runner.log(f"Invalid yaml {e}", level="ERROR") return yield from merge_multiple_methods(yaml_config, hf_token, repo_name, profile_name) with gr.Blocks() as demo: gr.Markdown(MARKDOWN_DESCRIPTION) with gr.Row(): filename = gr.Textbox(visible=False, label="filename") config = gr.Code(language="yaml", lines=10, label="config.yaml") with gr.Column(): token = gr.Textbox( lines=1, label="HF Write Token", info="https://hf.co/settings/token", type="password", placeholder="Optional. Will upload merged model to MergeKit Community if empty.", ) repo_name = gr.Textbox( lines=1, label="Repo name", placeholder="Optional. Will create a random name if empty.", ) profile_name = gr.Textbox( lines=1, label="Hugging Face Profile Name", placeholder="Enter your Hugging Face profile name.", ) button = gr.Button("Merge", variant="primary") logs = LogsView(label="Terminal output") gr.Examples( examples, fn=lambda s: (s,), run_on_click=True, label="Examples", inputs=[filename], outputs=[config], ) gr.Markdown(MARKDOWN_ARTICLE) button.click(fn=merge, inputs=[config, token, repo_name, profile_name], outputs=[logs]) def _garbage_collect_every_hour(): while True: try: garbage_collect_empty_models(token=COMMUNITY_HF_TOKEN) except Exception as e: print("Error running garbage collection", e) time.sleep(3600) pool = ThreadPoolExecutor() pool.submit(_garbage_collect_every_hour) demo.queue(default_concurrency_limit=2).launch()