adarsh8962's picture
Update llava/serve/gradio_utils.py
2d6c974 verified
import torch
from llava.constants import X_TOKEN_INDEX
from llava.conversation import conv_templates, SeparatorStyle
from llava.mm_utils import get_model_name_from_path, KeywordsStoppingCriteria, tokenizer_X_token
from llava.model.builder import load_pretrained_model
from llava.utils import disable_torch_init
# ==== memory-safe, de-hallucinating generation helpers ====
import re
import torch
# deterministic + anti-repeat defaults
GEN_KW = dict(
do_sample=False,
temperature=0.0,
top_p=1.0,
repetition_penalty=1.15, # breaks [[[ spam
no_repeat_ngram_size=3, # avoids short loops
use_cache=False, # reduces VRAM spikes on L4
)
def _big_gpu():
try:
return (torch.cuda.is_available()
and torch.cuda.get_device_properties(0).total_memory / 1024**3 >= 40) # >=40GB = L40S/A100
except Exception:
return False
MAX_NEW_TOKENS_SMALL = 128 # L4 (24 GB VRAM)
MAX_NEW_TOKENS_BIG = 256 # L40S+ (48 GB VRAM)
def build_framewise_prompt(T: int) -> str:
return (
f"You will output exactly {T} plain lines, one per frame.\n"
"Format strictly:\n"
"Frame 1: <<=10 words>\n"
"Frame 2: <<=10 words>\n"
"...\n"
"No brackets [], no JSON, no code blocks, no numbered list other than 'Frame i:'."
)
def keep_frame_lines(text: str, T: int) -> str:
\"\"\"Keep only `Frame i: ...` lines; ensure frames 1..T exist.\"\"\"
lines = []
for ln in text.splitlines():
m = re.match(r\"^Frame\\s+(\\d+)\\s*:\\s*(.+)$\", ln.strip())
if not m:
continue
i = int(m.group(1))
body = \" \".join(m.group(2).split()[:10]) # ≤10 words
if 1 <= i <= T:
lines.append((i, f\"Frame {i}: {body}\"))
have = {i for i,_ in lines}
for i in range(1, T+1):
if i not in have:
lines.append((i, f\"Frame {i}: (no description)\")) # never leaves gaps
return \"\\n\".join(t for _, t in sorted(lines))
# ==== end helpers ====
title_markdown = ("""
<div style="display: flex; justify-content: center; align-items: center; text-align: center;">
<a href="https://github.com/PKU-YuanGroup/Video-LLaVA" style="margin-right: 20px; text-decoration: none; display: flex; align-items: center;">
<img src="https://z1.ax1x.com/2023/11/07/pil4sqH.png" alt="Video-LLaVA🚀" style="max-width: 120px; height: auto;">
</a>
<div>
<h1 >Video-LLaVA: Video-LLaVA: Learning United Visual Representation by Alignment Before Projection</h1>
<h5 style="margin: 0;">If you like our project, please give us a star ✨ on Github for the latest update.</h5>
</div>
</div>
<div align="center">
<div style="display:flex; gap: 0.25rem;" align="center">
<a href='https://github.com/PKU-YuanGroup/Video-LLaVA'><img src='https://img.shields.io/badge/Github-Code-blue'></a>
<a href="https://arxiv.org/pdf/2311.10122.pdf"><img src="https://img.shields.io/badge/Arxiv-2311.10122-red"></a>
<a href='https://github.com/PKU-YuanGroup/Video-LLaVA/stargazers'><img src='https://img.shields.io/github/stars/PKU-YuanGroup/Video-LLaVA.svg?style=social'></a>
</div>
</div>
""")
block_css = """
#buttons button {
min-width: min(120px,100%);
}
"""
tos_markdown = ("""
### Terms of use
By using this service, users are required to agree to the following terms:
The service is a research preview intended for non-commercial use only. It only provides limited safety measures and may generate offensive content. It must not be used for any illegal, harmful, violent, racist, or sexual purposes. The service may collect user dialogue data for future research.
Please click the "Flag" button if you get any inappropriate answer! We will collect those to keep improving our moderator.
For an optimal experience, please use desktop computers for this demo, as mobile devices may compromise its quality.
""")
learn_more_markdown = ("""
### License
The service is a research preview intended for non-commercial use only, subject to the model [License](https://github.com/facebookresearch/llama/blob/main/MODEL_CARD.md) of LLaMA, [Terms of Use](https://openai.com/policies/terms-of-use) of the data generated by OpenAI, and [Privacy Practices](https://chrome.google.com/webstore/detail/sharegpt-share-your-chatg/daiacboceoaocpibfodeljbdfacokfjb) of ShareGPT. Please contact us if you find any potential violation.
""")
class Chat:
def __init__(self, model_path, conv_mode, model_base=None, load_8bit=False, load_4bit=False, device='cuda'):
disable_torch_init()
model_name = get_model_name_from_path(model_path)
self.tokenizer, self.model, processor, context_len = load_pretrained_model(model_path, model_base, model_name,
load_8bit, load_4bit,
device=device)
self.image_processor = processor['image']
self.video_processor = processor['video']
self.conv_mode = conv_mode
self.device = self.model.device
print(self.model)
def get_prompt(self, qs, state):
state.append_message(state.roles[0], qs)
state.append_message(state.roles[1], None)
return state
@torch.inference_mode()
def generate(self, images_tensor: list, prompt: str, first_run: bool, state):
tokenizer, model, image_processor = self.tokenizer, self.model, self.image_processor
state = self.get_prompt(prompt, state)
prompt = state.get_prompt()
print('\n\n\n')
print(prompt)
if 'image' in images_tensor[1] and 'video' not in images_tensor[1]:
input_ids = tokenizer_X_token(prompt, tokenizer, X_TOKEN_INDEX['IMAGE'], return_tensors='pt').unsqueeze(0).to(self.device)
elif 'image' not in images_tensor[1] and 'video' in images_tensor[1]:
input_ids = tokenizer_X_token(prompt, tokenizer, X_TOKEN_INDEX['VIDEO'], return_tensors='pt').unsqueeze(0).to(self.device)
elif 'image' in images_tensor[1] and 'video' in images_tensor[1]:
# <video>\nxxxxxxx\n<image>
'''
tensor([[1, -200, 29871, 13, 3068, 366, 1074, 1716, 278, 1967, 322, 4863, 29973, 319, 1799, 9047, 13566, 29901]])
tensor([[1, -201, 29871, 13]])
'''
print("split: ", prompt.split('\n<image>'))
# print("\n", tokenizer_X_token('\n', tokenizer, X_TOKEN_INDEX['IMAGE'], return_tensors='pt'))
# print("?", tokenizer_X_token('?', tokenizer, X_TOKEN_INDEX['IMAGE'], return_tensors='pt'))
# print("image", tokenizer_X_token('image', tokenizer, X_TOKEN_INDEX['IMAGE'], return_tensors='pt'))
# print("image?", tokenizer_X_token('image?', tokenizer, X_TOKEN_INDEX['IMAGE'], return_tensors='pt'))
# print("USER: <image>\nWhat is unusual about this image?", tokenizer_X_token('USER: <image>\nWhat is unusual about this image?', tokenizer, X_TOKEN_INDEX['IMAGE'], return_tensors='pt'))
input_ids1 = tokenizer_X_token(prompt.split('\n<image>')[0], tokenizer, X_TOKEN_INDEX['VIDEO'], return_tensors='pt').unsqueeze(0).to(self.device)
print('input_ids1', input_ids1)
input_ids2 = tokenizer_X_token(prompt.split('\n<image>')[-1], tokenizer, X_TOKEN_INDEX['VIDEO'], return_tensors='pt').unsqueeze(0).to(self.device)
print('input_ids2', input_ids2)
input_ids3 = tokenizer_X_token('\n<image>', tokenizer, X_TOKEN_INDEX['IMAGE'], return_tensors='pt').unsqueeze(0).to(self.device)
print('input_ids3', input_ids3)
input_ids = torch.cat([input_ids1, input_ids3[:, 1:], input_ids2[:, 1:]], dim=-1)
print('input_ids', input_ids)
print(*[tokenizer.decode(i) for i in input_ids2[0]])
else:
input_ids = tokenizer_X_token(prompt, tokenizer, X_TOKEN_INDEX['IMAGE'], return_tensors='pt').unsqueeze(0).to(self.device)
temperature = 0.1
max_new_tokens = 1024
stop_str = conv_templates[self.conv_mode].copy().sep if conv_templates[self.conv_mode].copy().sep_style != SeparatorStyle.TWO else \
conv_templates[self.conv_mode].copy().sep2
keywords = [stop_str]
stopping_criteria = KeywordsStoppingCriteria(keywords, tokenizer, input_ids)
# streamer = TextStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
# print(input_ids, images_tensor[0][0].shape)
with torch.inference_mode():
# infer how many frames actually went in (works for list-of-frames or tensors)
def _infer_T(imgs):
try:
if isinstance(imgs, (list, tuple)) and len(imgs) > 0:
first = imgs[0]
if isinstance(first, (list, tuple)):
return len(first)
if hasattr(first, "shape"):
shp = list(first.shape)
if len(shp) >= 4: # [T, C, H, W] or [1, T, C, H, W]
return int(shp[0])
except Exception:
pass
return 8 # safe default
_T = _infer_T(images_tensor)
# VRAM-aware cap: more frames → allow a few more tokens, but stay safe on L4
max_new_tokens = min(16 * max(1, _T), MAX_NEW_TOKENS_BIG if _big_gpu() else MAX_NEW_TOKENS_SMALL)
output_ids = model.generate(
input_ids,
images=images_tensor,
max_new_tokens=max_new_tokens,
**GEN_KW, # <- deterministic + lower VRAM
stopping_criteria=[stopping_criteria],
)
input_token_len = input_ids.shape[1]
n_diff_input_output = (input_ids != output_ids[:, :input_token_len]).sum().item()
if n_diff_input_output > 0:
print(f'[Warning] {n_diff_input_output} output_ids are not the same as the input_ids')
outputs = tokenizer.batch_decode(output_ids[:, input_token_len:], skip_special_tokens=True)[0]
outputs = outputs.strip()
# If user asked about frames, force a clean "Frame i: ..." list
try:
_T = _infer_T(images_tensor)
except Exception:
_T = 8
if "frame" in prompt.lower():
cleaned = keep_frame_lines(outputs, _T)
if cleaned.strip():
outputs = cleaned
print("response", outputs)
return outputs, state