|
import os |
|
import pandas as pd |
|
from datasets import Dataset |
|
from transformers import AutoTokenizer, AutoConfig |
|
from datetime import datetime |
|
from huggingface_hub import HfApi, create_repo, upload_folder, hf_hub_download |
|
import traceback |
|
import threading |
|
import uvicorn |
|
import time |
|
from fastapi import FastAPI |
|
from fastapi.responses import JSONResponse |
|
|
|
|
|
MODEL_NAME = "TURKCELL/Turkcell-LLM-7b-v1" |
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
SOURCE_DATASET_ID = "UcsTurkey/turkish-train-chunks" |
|
TRAIN_TARGET_DATASET_ID = "UcsTurkey/turkish-train-tokenized" |
|
RAG_TARGET_DATASET_ID = "UcsTurkey/turkish-train-rag" |
|
BUFFER_SIZE = 5 |
|
START_CHUNK_NUMBER = 0 |
|
PROCESS_CHUNK_COUNT = 776 |
|
|
|
GENERATE_TRAIN_DATA = False |
|
GENERATE_RAG_DATA = True |
|
|
|
CHUNK_FOLDER = "/data/chunks" |
|
TRAIN_FOLDER = "/data/tokenized_chunks" |
|
RAG_FOLDER = "/data/rag_chunks" |
|
CACHE_DIR = "/data/.hf_cache" |
|
|
|
os.makedirs(CHUNK_FOLDER, exist_ok=True) |
|
os.makedirs(TRAIN_FOLDER, exist_ok=True) |
|
os.makedirs(RAG_FOLDER, exist_ok=True) |
|
os.makedirs(CACHE_DIR, exist_ok=True) |
|
|
|
|
|
app = FastAPI() |
|
|
|
@app.get("/") |
|
def health(): |
|
return JSONResponse(content={"status": "ok"}) |
|
|
|
def run_health_server(): |
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|
|
threading.Thread(target=run_health_server, daemon=True).start() |
|
|
|
|
|
def log(message): |
|
timestamp = datetime.now().strftime("%H:%M:%S") |
|
print(f"[{timestamp}] {message}") |
|
os.sys.stdout.flush() |
|
|
|
|
|
os.environ["HF_HOME"] = CACHE_DIR |
|
log(f"🔁 Tokenizer yükleniyor: {MODEL_NAME}") |
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=False, cache_dir=CACHE_DIR) |
|
if tokenizer.pad_token is None: |
|
log("ℹ️ pad_token tanımlı değil, eos_token atanıyor.") |
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
config = AutoConfig.from_pretrained(MODEL_NAME, cache_dir=CACHE_DIR) |
|
MAX_LEN = getattr(config, "max_position_embeddings", 2048) |
|
|
|
|
|
api = HfApi() |
|
files = api.list_repo_files(repo_id=SOURCE_DATASET_ID, repo_type="dataset", token=HF_TOKEN) |
|
csv_files = sorted([f for f in files if f.endswith(".csv")]) |
|
selected_files = csv_files[START_CHUNK_NUMBER:START_CHUNK_NUMBER + PROCESS_CHUNK_COUNT] |
|
|
|
buffer_counter_train = 0 |
|
buffer_counter_rag = 0 |
|
|
|
def tokenize(example): |
|
|
|
prompt = f"### Instruction:\n{example['instruction']}\n\n### Response:\n{example['output']}" |
|
tokenized = tokenizer(prompt, truncation=True, padding="max_length", max_length=MAX_LEN) |
|
tokenized["labels"] = [ |
|
-100 if token_id == tokenizer.pad_token_id else token_id for token_id in tokenized["input_ids"] |
|
] |
|
return tokenized |
|
|
|
def upload_if_ready(folder_path, target_repo): |
|
if os.listdir(folder_path): |
|
log(f"⬆️ BUFFER doldu. Hugging Face'e yükleniyor: {target_repo}") |
|
create_repo(target_repo, repo_type="dataset", token=HF_TOKEN, exist_ok=True) |
|
upload_folder(repo_id=target_repo, folder_path=folder_path, repo_type="dataset", token=HF_TOKEN) |
|
log("🧹 Upload sonrası klasör temizleniyor...") |
|
for f in os.listdir(folder_path): |
|
os.remove(os.path.join(folder_path, f)) |
|
return 0 |
|
return 0 |
|
|
|
for idx, filename in enumerate(selected_files): |
|
log(f"\n📄 {idx+1}/{len(selected_files)} → {filename} işleniyor...") |
|
try: |
|
local_path = os.path.join(CHUNK_FOLDER, os.path.basename(filename)) |
|
hf_hub_download( |
|
repo_id=SOURCE_DATASET_ID, |
|
filename=filename, |
|
local_dir=CHUNK_FOLDER, |
|
token=HF_TOKEN, |
|
repo_type="dataset" |
|
) |
|
df = pd.read_csv(local_path).dropna() |
|
df = df[df["question"].str.strip().astype(bool) & df["answer"].str.strip().astype(bool)] |
|
df = df.rename(columns={"question": "instruction", "answer": "output"}) |
|
log(f"✅ Geçerli satır sayısı: {len(df)}") |
|
|
|
if GENERATE_RAG_DATA: |
|
rag_dataset = Dataset.from_pandas(df[["instruction", "output"]]) |
|
rag_path = os.path.join(RAG_FOLDER, filename.replace(".csv", ".parquet")) |
|
rag_dataset.to_parquet(rag_path, compression="brotli") |
|
log(f"📦 RAG parquet kaydedildi: {rag_path}") |
|
buffer_counter_rag += 1 |
|
if buffer_counter_rag >= BUFFER_SIZE: |
|
buffer_counter_rag = upload_if_ready(RAG_FOLDER, RAG_TARGET_DATASET_ID) |
|
|
|
if GENERATE_TRAIN_DATA: |
|
train_dataset = Dataset.from_pandas(df[["instruction", "output"]]) |
|
tokenized_dataset = train_dataset.map(tokenize) |
|
parquet_path = os.path.join(TRAIN_FOLDER, filename.replace(".csv", ".parquet")) |
|
tokenized_dataset.to_parquet(parquet_path, compression="snappy") |
|
log(f"🎯 Tokenized parquet kaydedildi: {parquet_path}") |
|
buffer_counter_train += 1 |
|
if buffer_counter_train >= BUFFER_SIZE: |
|
buffer_counter_train = upload_if_ready(TRAIN_FOLDER, TRAIN_TARGET_DATASET_ID) |
|
|
|
except Exception as e: |
|
log(f"❌ Hata oluştu: {filename} → {e}") |
|
traceback.print_exc() |
|
continue |
|
|
|
if GENERATE_TRAIN_DATA: |
|
buffer_counter_train = upload_if_ready(TRAIN_FOLDER, TRAIN_TARGET_DATASET_ID) |
|
if GENERATE_RAG_DATA: |
|
buffer_counter_rag = upload_if_ready(RAG_FOLDER, RAG_TARGET_DATASET_ID) |
|
|
|
log("✅ Tüm işlemler tamamlandı. Servis bekleme modunda...") |
|
while True: |
|
time.sleep(60) |
|
|