| | |
| |
|
| | from transformers import AutoTokenizer ,AutoModelForCausalLM, AutoModelForSeq2SeqLM |
| | import re |
| | |
| | import os |
| | import yaml |
| | import torch |
| | from torch import package |
| | |
| | |
| | from textwrap3 import wrap |
| | import replicate |
| | import chatGPT |
| |
|
| |
|
| | |
| | def get_length_param(text: str, tokenizer) -> str: |
| | tokens_count = len(tokenizer.encode(text)) |
| | if tokens_count <= 15: |
| | len_param = '1' |
| | elif tokens_count <= 50: |
| | len_param = '2' |
| | elif tokens_count <= 256: |
| | len_param = '3' |
| | else: |
| | len_param = '-' |
| | return len_param |
| |
|
| | def remove_duplicates(S): |
| | S = re.sub(r'[a-zA-Z]+', '', S) |
| | S = S.split() |
| | result = "" |
| | for subst in S: |
| | if subst not in result: |
| | result += subst+" " |
| | return result.rstrip() |
| |
|
| | def removeSigns(S): |
| | last_index = max(S.rfind("."), S.rfind("!")) |
| | if last_index >= 0: |
| | S = S[:last_index+1] |
| | return S |
| |
|
| | def prepare_punct(): |
| | |
| | |
| | torch.backends.quantized.engine = 'qnnpack' |
| |
|
| | torch.hub.download_url_to_file('https://raw.githubusercontent.com/snakers4/silero-models/master/models.yml', |
| | 'latest_silero_models.yml', |
| | progress=False) |
| |
|
| | with open('latest_silero_models.yml', 'r') as yaml_file: |
| | models = yaml.load(yaml_file, Loader=yaml.SafeLoader) |
| | model_conf = models.get('te_models').get('latest') |
| |
|
| | |
| | model_url = model_conf.get('package') |
| |
|
| | model_dir = "downloaded_model" |
| | os.makedirs(model_dir, exist_ok=True) |
| | model_path = os.path.join(model_dir, os.path.basename(model_url)) |
| |
|
| | if not os.path.isfile(model_path): |
| | torch.hub.download_url_to_file(model_url, |
| | model_path, |
| | progress=True) |
| |
|
| | imp = package.PackageImporter(model_path) |
| | model_punct = imp.load_pickle("te_model", "model") |
| |
|
| | return model_punct |
| |
|
| | def initialize(): |
| | |
| | """ Loading the model """ |
| | fit_checkpoint = "WarBot" |
| | tokenizer = AutoTokenizer.from_pretrained(fit_checkpoint) |
| | model = AutoModelForCausalLM.from_pretrained(fit_checkpoint) |
| | model_punсt = prepare_punct() |
| |
|
| | """ Initialize the translational model """ |
| | os.environ['REPLICATE_API_TOKEN'] = '2254e586b1380c49a948fd00d6802d45962492e4' |
| | translation_model_name = "Helsinki-NLP/opus-mt-ru-en" |
| | translation_tokenizer = AutoTokenizer.from_pretrained(translation_model_name) |
| | translation_model = AutoModelForSeq2SeqLM.from_pretrained(translation_model_name) |
| |
|
| | """ Initialize the image model """ |
| | imageModel = replicate.models.get("stability-ai/stable-diffusion") |
| | imgModel_version = imageModel.versions.get("27b93a2413e7f36cd83da926f3656280b2931564ff050bf9575f1fdf9bcd7478") |
| |
|
| | return (model, tokenizer, model_punсt, translation_model, translation_tokenizer, imgModel_version) |
| |
|
| | def translate(text:str,translation_model,translation_tokenizer): |
| | |
| | src = "ru" |
| | trg = "en" |
| |
|
| | try: |
| | batch = translation_tokenizer([text], return_tensors="pt") |
| | generated_ids = translation_model.generate(**batch) |
| | translated = translation_tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] |
| | except: |
| | translated = "" |
| | return translated |
| |
|
| | def generate_image(prompt:str, imgModel_version): |
| | |
| | prompt = prompt.replace("?","") |
| | try: |
| | output_url = imgModel_version.predict(prompt=prompt)[0] |
| | except: |
| | output_url = "" |
| |
|
| | return output_url |
| |
|
| | def split_string(string,n=256): |
| | return [string[i:i+n] for i in range(0, len(string), n)] |
| |
|
| | def get_response(quote:str,model,tokenizer,model_punct,temperature=0.2): |
| | |
| | try: |
| | user_inpit_ids = tokenizer.encode(f"|0|{get_length_param(quote, tokenizer)}|" \ |
| | + quote + tokenizer.eos_token, return_tensors="pt") |
| | |
| | except: |
| | return "Exception in tokenization" |
| |
|
| | chat_history_ids = user_inpit_ids |
| |
|
| | tokens_count = len(tokenizer.encode(quote)) |
| | if tokens_count < 15: |
| | no_repeat_ngram_size = 2 |
| | else: |
| | no_repeat_ngram_size = 1 |
| |
|
| | try: |
| | output_id = model.generate( |
| | chat_history_ids, |
| | num_return_sequences=1, |
| | max_length=200, |
| | no_repeat_ngram_size=no_repeat_ngram_size, |
| | do_sample=True, |
| | top_k=50, |
| | top_p=0.9, |
| | temperature = temperature, |
| | eos_token_id=tokenizer.eos_token_id, |
| | pad_token_id=tokenizer.pad_token_id, |
| | |
| | ) |
| | except: |
| | return "Exception" |
| |
|
| | response = tokenizer.decode(output_id[0], skip_special_tokens=True) |
| | response = removeSigns(response) |
| | response = response.split(quote)[-1] |
| | response = re.sub(r'[^0-9А-Яа-яЁёa-zA-z;., !()/\-+:?]', '', |
| | response) |
| | response = remove_duplicates(re.sub(r"\d{4,}", "", response)) |
| | response = re.sub(r'\.\.+', '', response) |
| |
|
| | if len(response)>200: |
| | resps = wrap(response,200) |
| | for i in range(len(resps)): |
| | try: |
| | resps[i] = model_punct.enhance_text(resps[i], lan='ru') |
| | response = ''.join(resps) |
| | except: |
| | return "" |
| | else: |
| | response = model_punct.enhance_text(response, lan='ru') |
| |
|
| | |
| | response = re.sub(r'[UNK]', '', response) |
| | response = re.sub(r',+', ',', response) |
| | response = re.sub(r'-+', ',', response) |
| | response = re.sub(r'\.\?', '?', response) |
| | response = re.sub(r'\,\?', '?', response) |
| | response = re.sub(r'\.\!', '!', response) |
| | response = re.sub(r'\.\,', ',', response) |
| | response = re.sub(r'\.\)', '.', response) |
| | response = response.replace('[]', '') |
| |
|
| | |
| | response = chatGPT.uGPT(response,quote) |
| | return response |
| |
|
| |
|
| | if __name__ == '__main__': |
| | """ |
| | quote = "Здравствуй, Жопа, Новый Год, выходи на ёлку!" |
| | model, tokenizer, model_punct = initialize() |
| | response = "" |
| | while not response: |
| | response = get_response(quote, model, tokenizer, model_punct,temperature=0.2) |
| | print(response) |
| | """ |