Spaces:
Build error
Build error
from pathlib import Path | |
import os | |
import shutil | |
import string | |
import secrets | |
import hashlib | |
import random | |
import time | |
import re | |
def get_files_count(directory_path): | |
return len(os.listdir(directory_path)) | |
def generate_random_string(length=10): | |
characters = string.ascii_letters | |
random_string = ''.join(secrets.choice(characters) for _ in range(length)) | |
return random_string | |
def generate_random_string_from_input(input_string, length=16): | |
# Hash the input string to get a consistent value | |
hash_object = hashlib.sha256(input_string.encode()) | |
hashed_string = hash_object.hexdigest() | |
# Use the hash to seed the random number generator | |
random.seed(hashed_string) | |
# Generate a random string based on the seed | |
characters = string.ascii_letters + string.digits | |
random_string = ''.join(random.choice(characters) for _ in range(length)) | |
return random_string | |
def is_mostly_black(frame, black_threshold=20, percentage_threshold=0.9, sample_rate=10): | |
""" | |
Fast black frame detection using pixel sampling. | |
Args: | |
frame: OpenCV BGR frame (NumPy array) | |
black_threshold: grayscale value below which a pixel is considered black | |
percentage_threshold: fraction of black pixels to consider frame mostly black | |
sample_rate: sample every N-th pixel in both dimensions (higher = faster) | |
Returns: | |
True if mostly black, False otherwise | |
""" | |
import cv2 | |
import numpy as np | |
if frame is None or frame.size == 0: | |
return True | |
# Convert to grayscale | |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
# Sample pixels | |
sampled = gray[::sample_rate, ::sample_rate] | |
black_count = np.sum(sampled < black_threshold) | |
total_count = sampled.size | |
return (black_count / total_count) >= percentage_threshold | |
def only_alpha(text: str) -> str: | |
# Keep only alphabetic characters (make lowercase to ignore case) | |
return re.sub(r'[^a-zA-Z]', '', text).lower() | |
def manage_gpu(size_gb: float = 0, gpu_index: int = 0, action: str = "check"): | |
""" | |
Manage GPU memory: | |
- check → just prints memory + process table | |
- clear_cache → clears PyTorch cache | |
- kill → kills all GPU processes | |
""" | |
try: | |
import pynvml,signal, gc | |
pynvml.nvmlInit() | |
handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_index) | |
info = pynvml.nvmlDeviceGetMemoryInfo(handle) | |
free_gb = info.free / 1024**3 | |
total_gb = info.total / 1024**3 | |
print(f"\nGPU {gpu_index}: Free {free_gb:.2f} GB / Total {total_gb:.2f} GB") | |
# Show processes | |
processes = pynvml.nvmlDeviceGetComputeRunningProcesses(handle) | |
print("\nActive GPU Processes:") | |
print(f"{'PID':<8} {'Process Name':<40} {'Used (GB)':<10}") | |
print("-" * 60) | |
for p in processes: | |
used_gb = p.usedGpuMemory / 1024**3 | |
proc_name = pynvml.nvmlSystemGetProcessName(p.pid).decode(errors="ignore") | |
print(f"{p.pid:<8} {proc_name:<40} {used_gb:.2f}") | |
if action == "clear_cache": | |
try: | |
import torch | |
gc.collect() | |
gc.collect() | |
torch.cuda.empty_cache() | |
torch.cuda.reset_peak_memory_stats() | |
torch.cuda.synchronize() | |
time.sleep(1) | |
print("\n🧹 Cleared PyTorch CUDA cache") | |
except ImportError: | |
print("\n⚠️ PyTorch not installed, cannot clear cache.") | |
elif action == "kill": | |
for p in processes: | |
proc_name = pynvml.nvmlSystemGetProcessName(p.pid).decode(errors="ignore") | |
try: | |
os.kill(p.pid, signal.SIGKILL) | |
print(f"❌ Killed {p.pid} ({proc_name})") | |
except Exception as e: | |
print(f"⚠️ Could not kill {p.pid}: {e}") | |
manage_gpu(action="clear_cache") | |
gc.collect() | |
gc.collect() | |
return free_gb > size_gb | |
except: return False | |
def is_gpu_available(verbose=True): | |
import torch | |
if not torch.cuda.is_available(): | |
if verbose: | |
print("CUDA not available.") | |
return False | |
try: | |
# Try a tiny allocation to check if GPU is free & usable | |
torch.empty(1, device="cuda") | |
if verbose: | |
print(f"CUDA available. Using device: {torch.cuda.get_device_name(0)}") | |
return True | |
except RuntimeError as e: | |
if "CUDA-capable device(s) is/are busy or unavailable" in str(e) or \ | |
"CUDA error" in str(e): | |
if verbose: | |
print("CUDA detected but busy/unavailable. Please CPU.") | |
return False | |
raise # re-raise if it's some other unexpected error | |
def get_device(is_vision=False): | |
if not is_vision and os.getenv("USE_CPU_IF_POSSIBLE", None): | |
return "cpu" | |
else: | |
return "cuda" if is_gpu_available() else "cpu" |