wenlab / sync_data.sh
wangyikunkun's picture
Create sync_data.sh
e96f980 verified
#!/bin/bash
# 检查备份变量
if [ -z "$HF_TOKEN" ] || [ -z "$DATASET_ID" ]; then
echo "⚠️ 缺少 HF_TOKEN 或 DATASET_ID,备份功能未启用"
exec java ${JVM_OPTS} -jar /opt/halo/halo.jar
exit 0
fi
# 设置默认值
DATASET_N=${DATASET_N:-10} # 默认保留最新 10 个备份
SYNC_INTERVAL=${SYNC_INTERVAL:-36000} # 默认同步间隔 36000 秒(10小时)
BACKUP_DIR="$HOME/.halo2"
BACKUP_PREFIX="halo_backup_"
BACKUP_EXT=".tar.gz"
HF_BRANCH="main"
# 打印消息到控制台
print_message() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1"
}
# 检查并创建 Hugging Face 数据集(如果不存在)
create_dataset() {
print_message "检查 Hugging Face 数据集 '${DATASET_ID}' 是否存在..."
python3 <<EOF
import os
from huggingface_hub import HfApi
try:
api = HfApi(token=os.getenv("HF_TOKEN"))
repo_id = os.getenv("DATASET_ID")
# 获取当前用户的所有数据集
user_datasets = [d.id for d in api.list_datasets(author=repo_id.split('/')[0])]
if repo_id not in user_datasets:
api.create_repo(repo_id=repo_id, repo_type="dataset", private=True)
print(f"✅ 数据集 '{repo_id}' 不存在,已创建(私有)。")
else:
print(f"✅ 数据集 '{repo_id}' 已存在。")
except Exception as e:
print(f"⚠️ 数据集检查/创建失败:{str(e)}")
EOF
}
# 检查并创建 filecode 分支(如果不存在)
create_branch() {
print_message "检查 Hugging Face 数据集的 '${HF_BRANCH}' 分支..."
python3 <<EOF
import os
from huggingface_hub import HfApi
try:
api = HfApi(token=os.getenv("HF_TOKEN"))
repo_id = os.getenv("DATASET_ID")
hf_branch = "${HF_BRANCH}"
# 获取所有分支
branches = api.list_repo_refs(repo_id, repo_type="dataset").branches
branch_names = [b.name for b in branches]
if hf_branch not in branch_names:
api.create_branch(repo_id=repo_id, branch=hf_branch, repo_type="dataset")
print(f"✅ 分支 '{hf_branch}' 不存在,已创建。")
else:
print(f"✅ 分支 '{hf_branch}' 已存在。")
except Exception as e:
print(f"⚠️ 分支检查/创建失败:{str(e)}")
EOF
}
# 下载最新备份
download_data() {
print_message "开始下载最新备份..."
python3 <<EOF
import os
import tarfile
import tempfile
import shutil
from huggingface_hub import HfApi
def download_and_extract(api, repo_id, branch):
"""下载并解压最新备份"""
files = api.list_repo_files(repo_id=repo_id, repo_type='dataset', revision=branch)
backup_files = sorted(f for f in files if f.startswith('${BACKUP_PREFIX}') and f.endswith('${BACKUP_EXT}'))
if not backup_files:
print("⚠️ 未找到任何备份文件")
return False
latest_backup = backup_files[-1]
# 使用临时目录下载文件
with tempfile.TemporaryDirectory() as temp_dir:
filepath = api.hf_hub_download(
repo_id=repo_id,
filename=latest_backup,
repo_type='dataset',
local_dir=temp_dir,
revision=branch
)
# 直接解压到 BACKUP_DIR
with tarfile.open(filepath, 'r:gz') as tar:
tar.extractall("${BACKUP_DIR}")
print("✅ 成功从最新备份恢复到 ${BACKUP_DIR}")
return True
if __name__ == "__main__":
api = HfApi(token=os.getenv("HF_TOKEN"))
restored = download_and_extract(api, os.getenv("DATASET_ID"), "${HF_BRANCH}")
print("RESTORED=1" if restored else "RESTORED=0")
EOF
}
# 备份数据到 Hugging Face
backup_data() {
local timestamp=$(date +%Y%m%d_%H%M%S)
local backup_file="${BACKUP_PREFIX}${timestamp}${BACKUP_EXT}"
# 使用 Python 创建临时目录
local temp_dir=$(python3 -c "import tempfile; print(tempfile.mkdtemp())")
local backup_path="${temp_dir}/${backup_file}"
print_message "开始备份数据:${backup_file} (临时目录: ${temp_dir})"
# 创建备份文件
tar -czf "$backup_path" -C "$BACKUP_DIR" .
# 使用 Python 上传并清理旧备份
python3 <<EOF
import os
import tempfile
from huggingface_hub import HfApi
try:
api = HfApi(token=os.getenv("HF_TOKEN"))
repo_id = os.getenv("DATASET_ID")
backup_file = "${backup_path}"
dataset_n = ${DATASET_N}
hf_branch = "${HF_BRANCH}"
# 上传新备份到 filecode 分支
api.upload_file(
path_or_fileobj=backup_file,
path_in_repo=os.path.basename(backup_file),
repo_id=repo_id,
repo_type="dataset",
revision=hf_branch
)
print(f"✅ 备份上传成功(分支: {hf_branch}):{os.path.basename(backup_file)}")
# 获取所有备份文件
files = api.list_repo_files(repo_id, repo_type="dataset", revision=hf_branch)
backup_files = sorted(f for f in files if f.startswith("${BACKUP_PREFIX}") and f.endswith("${BACKUP_EXT}"))
# 删除旧备份,仅保留最新 dataset_n 个
if len(backup_files) > dataset_n:
to_delete = backup_files[:-dataset_n]
for old_file in to_delete:
api.delete_file(path_in_repo=old_file, repo_id=repo_id, repo_type="dataset", revision=hf_branch)
print(f"🗑️ 已删除过期备份(分支: {hf_branch}):{old_file}")
except Exception as e:
print(f"⚠️ 备份失败:{str(e)}")
EOF
# 清理本地临时备份文件和目录
rm -f "$backup_path"
rm -rf "$temp_dir"
print_message "✅ 备份完成:${backup_file}"
# 使用 super_squash_history 压缩 commit 历史
print_message "开始压缩 commit 历史..."
python3 <<EOF
import os
from huggingface_hub import HfApi
try:
api = HfApi(token=os.getenv("HF_TOKEN"))
repo_id = os.getenv("DATASET_ID")
hf_branch = "${HF_BRANCH}"
api.super_squash_history(repo_id=repo_id, repo_type="dataset", branch=hf_branch)
print(f"✅ commit 历史已成功压缩(分支: {hf_branch})")
except Exception as e:
print(f"⚠️ commit 历史压缩失败:{str(e)}")
EOF
print_message "commit 历史压缩完成"
}
# 启动数据同步进程
sync_data() {
while true; do
backup_data
print_message "⏳ 下次同步将在 ${SYNC_INTERVAL} 秒后进行..."
sleep "$SYNC_INTERVAL"
done
}
# 主程序执行逻辑
(
print_message "🚀 系统启动,准备从 Hugging Face 下载最新备份..."
create_dataset
create_branch
download_data
sync_data &
exec java ${JVM_OPTS} -jar /opt/halo/halo.jar
) 2>&1