Spaces:
Sleeping
Sleeping
File size: 4,290 Bytes
b99ca3e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
from functools import lru_cache
from typing import List
import numpy
from tqdm import tqdm
from facefusion import inference_manager, state_manager, wording
from facefusion.download import conditional_download_hashes, conditional_download_sources, resolve_download_url
from facefusion.filesystem import resolve_relative_path
from facefusion.thread_helper import conditional_thread_semaphore
from facefusion.types import Detection, DownloadScope, Fps, InferencePool, ModelOptions, ModelSet, Score, VisionFrame
from facefusion.vision import detect_video_fps, fit_frame, read_image, read_video_frame
STREAM_COUNTER = 0
@lru_cache(maxsize = None)
def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
return\
{
'yolo_nsfw':
{
'hashes':
{
'content_analyser':
{
'url': resolve_download_url('models-3.2.0', 'yolo_11m_nsfw.hash'),
'path': resolve_relative_path('../.assets/models/yolo_11m_nsfw.hash')
}
},
'sources':
{
'content_analyser':
{
'url': resolve_download_url('models-3.2.0', 'yolo_11m_nsfw.onnx'),
'path': resolve_relative_path('../.assets/models/yolo_11m_nsfw.onnx')
}
},
'size': (640, 640)
}
}
def get_inference_pool() -> InferencePool:
model_names = [ 'yolo_nsfw' ]
model_source_set = get_model_options().get('sources')
return inference_manager.get_inference_pool(__name__, model_names, model_source_set)
def clear_inference_pool() -> None:
model_names = [ 'yolo_nsfw' ]
inference_manager.clear_inference_pool(__name__, model_names)
def get_model_options() -> ModelOptions:
return create_static_model_set('full').get('yolo_nsfw')
def pre_check() -> bool:
model_hash_set = get_model_options().get('hashes')
model_source_set = get_model_options().get('sources')
return conditional_download_hashes(model_hash_set) and conditional_download_sources(model_source_set)
def analyse_stream(vision_frame : VisionFrame, video_fps : Fps) -> bool:
global STREAM_COUNTER
STREAM_COUNTER = STREAM_COUNTER + 1
if STREAM_COUNTER % int(video_fps) == 0:
return analyse_frame(vision_frame)
return False
def analyse_frame(vision_frame : VisionFrame) -> bool:
nsfw_scores = detect_nsfw(vision_frame)
return len(nsfw_scores) > 0
@lru_cache(maxsize = None)
def analyse_image(image_path : str) -> bool:
vision_frame = read_image(image_path)
return analyse_frame(vision_frame)
@lru_cache(maxsize = None)
def analyse_video(video_path : str, trim_frame_start : int, trim_frame_end : int) -> bool:
video_fps = detect_video_fps(video_path)
frame_range = range(trim_frame_start, trim_frame_end)
rate = 0.0
total = 0
counter = 0
with tqdm(total = len(frame_range), desc = wording.get('analysing'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
for frame_number in frame_range:
if frame_number % int(video_fps) == 0:
vision_frame = read_video_frame(video_path, frame_number)
total += 1
if analyse_frame(vision_frame):
counter += 1
if counter > 0 and total > 0:
rate = counter / total * 100
progress.set_postfix(rate = rate)
progress.update()
return rate > 10.0
def detect_nsfw(vision_frame : VisionFrame) -> List[Score]:
nsfw_scores = []
model_size = get_model_options().get('size')
temp_vision_frame = fit_frame(vision_frame, model_size)
detect_vision_frame = prepare_detect_frame(temp_vision_frame)
detection = forward(detect_vision_frame)
detection = numpy.squeeze(detection).T
nsfw_scores_raw = numpy.amax(detection[:, 4:], axis = 1)
keep_indices = numpy.where(nsfw_scores_raw > 0.2)[0]
if numpy.any(keep_indices):
nsfw_scores_raw = nsfw_scores_raw[keep_indices]
nsfw_scores = nsfw_scores_raw.ravel().tolist()
return nsfw_scores
def forward(vision_frame : VisionFrame) -> Detection:
content_analyser = get_inference_pool().get('content_analyser')
with conditional_thread_semaphore():
detection = content_analyser.run(None,
{
'input': vision_frame
})
return detection
def prepare_detect_frame(temp_vision_frame : VisionFrame) -> VisionFrame:
detect_vision_frame = temp_vision_frame / 255.0
detect_vision_frame = numpy.expand_dims(detect_vision_frame.transpose(2, 0, 1), axis = 0).astype(numpy.float32)
return detect_vision_frame
|