mistral7b / tokenize_and_upload_mistral.py
ciyidogan's picture
Update tokenize_and_upload_mistral.py
df4528e verified
import os
import pandas as pd
from datasets import Dataset
from transformers import AutoTokenizer, AutoConfig
from datetime import datetime
from huggingface_hub import HfApi, create_repo, upload_folder, hf_hub_download
import traceback
import threading
import uvicorn
import time
from fastapi import FastAPI
from fastapi.responses import JSONResponse
# === Sabitler ===
MODEL_NAME = "TURKCELL/Turkcell-LLM-7b-v1"
HF_TOKEN = os.getenv("HF_TOKEN")
SOURCE_DATASET_ID = "UcsTurkey/turkish-train-chunks"
TRAIN_TARGET_DATASET_ID = "UcsTurkey/turkish-train-tokenized"
RAG_TARGET_DATASET_ID = "UcsTurkey/turkish-train-rag"
BUFFER_SIZE = 5
START_CHUNK_NUMBER = 0
PROCESS_CHUNK_COUNT = 776
GENERATE_TRAIN_DATA = False
GENERATE_RAG_DATA = True
CHUNK_FOLDER = "/data/chunks"
TRAIN_FOLDER = "/data/tokenized_chunks"
RAG_FOLDER = "/data/rag_chunks"
CACHE_DIR = "/data/.hf_cache"
os.makedirs(CHUNK_FOLDER, exist_ok=True)
os.makedirs(TRAIN_FOLDER, exist_ok=True)
os.makedirs(RAG_FOLDER, exist_ok=True)
os.makedirs(CACHE_DIR, exist_ok=True)
# ✅ Health check sunucusu
app = FastAPI()
@app.get("/")
def health():
return JSONResponse(content={"status": "ok"})
def run_health_server():
uvicorn.run(app, host="0.0.0.0", port=7860)
threading.Thread(target=run_health_server, daemon=True).start()
# 🕒 Zamanlı log fonksiyonu
def log(message):
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] {message}")
os.sys.stdout.flush()
# === Tokenizer ===
os.environ["HF_HOME"] = CACHE_DIR
log(f"🔁 Tokenizer yükleniyor: {MODEL_NAME}")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=False, cache_dir=CACHE_DIR)
if tokenizer.pad_token is None:
log("ℹ️ pad_token tanımlı değil, eos_token atanıyor.")
tokenizer.pad_token = tokenizer.eos_token
config = AutoConfig.from_pretrained(MODEL_NAME, cache_dir=CACHE_DIR)
MAX_LEN = getattr(config, "max_position_embeddings", 2048)
# === Hugging Face API ===
api = HfApi()
files = api.list_repo_files(repo_id=SOURCE_DATASET_ID, repo_type="dataset", token=HF_TOKEN)
csv_files = sorted([f for f in files if f.endswith(".csv")])
selected_files = csv_files[START_CHUNK_NUMBER:START_CHUNK_NUMBER + PROCESS_CHUNK_COUNT]
buffer_counter_train = 0
buffer_counter_rag = 0
def tokenize(example):
# ✅ Mistral-7B-Instruct formatına uygun prompt
prompt = f"### Instruction:\n{example['instruction']}\n\n### Response:\n{example['output']}"
tokenized = tokenizer(prompt, truncation=True, padding="max_length", max_length=MAX_LEN)
tokenized["labels"] = [
-100 if token_id == tokenizer.pad_token_id else token_id for token_id in tokenized["input_ids"]
]
return tokenized
def upload_if_ready(folder_path, target_repo):
if os.listdir(folder_path):
log(f"⬆️ BUFFER doldu. Hugging Face'e yükleniyor: {target_repo}")
create_repo(target_repo, repo_type="dataset", token=HF_TOKEN, exist_ok=True)
upload_folder(repo_id=target_repo, folder_path=folder_path, repo_type="dataset", token=HF_TOKEN)
log("🧹 Upload sonrası klasör temizleniyor...")
for f in os.listdir(folder_path):
os.remove(os.path.join(folder_path, f))
return 0
return 0
for idx, filename in enumerate(selected_files):
log(f"\n📄 {idx+1}/{len(selected_files)}{filename} işleniyor...")
try:
local_path = os.path.join(CHUNK_FOLDER, os.path.basename(filename))
hf_hub_download(
repo_id=SOURCE_DATASET_ID,
filename=filename,
local_dir=CHUNK_FOLDER,
token=HF_TOKEN,
repo_type="dataset"
)
df = pd.read_csv(local_path).dropna()
df = df[df["question"].str.strip().astype(bool) & df["answer"].str.strip().astype(bool)]
df = df.rename(columns={"question": "instruction", "answer": "output"})
log(f"✅ Geçerli satır sayısı: {len(df)}")
if GENERATE_RAG_DATA:
rag_dataset = Dataset.from_pandas(df[["instruction", "output"]])
rag_path = os.path.join(RAG_FOLDER, filename.replace(".csv", ".parquet"))
rag_dataset.to_parquet(rag_path, compression="brotli")
log(f"📦 RAG parquet kaydedildi: {rag_path}")
buffer_counter_rag += 1
if buffer_counter_rag >= BUFFER_SIZE:
buffer_counter_rag = upload_if_ready(RAG_FOLDER, RAG_TARGET_DATASET_ID)
if GENERATE_TRAIN_DATA:
train_dataset = Dataset.from_pandas(df[["instruction", "output"]])
tokenized_dataset = train_dataset.map(tokenize)
parquet_path = os.path.join(TRAIN_FOLDER, filename.replace(".csv", ".parquet"))
tokenized_dataset.to_parquet(parquet_path, compression="snappy")
log(f"🎯 Tokenized parquet kaydedildi: {parquet_path}")
buffer_counter_train += 1
if buffer_counter_train >= BUFFER_SIZE:
buffer_counter_train = upload_if_ready(TRAIN_FOLDER, TRAIN_TARGET_DATASET_ID)
except Exception as e:
log(f"❌ Hata oluştu: {filename}{e}")
traceback.print_exc()
continue
if GENERATE_TRAIN_DATA:
buffer_counter_train = upload_if_ready(TRAIN_FOLDER, TRAIN_TARGET_DATASET_ID)
if GENERATE_RAG_DATA:
buffer_counter_rag = upload_if_ready(RAG_FOLDER, RAG_TARGET_DATASET_ID)
log("✅ Tüm işlemler tamamlandı. Servis bekleme modunda...")
while True:
time.sleep(60)