Spaces:
Running
on
Zero
Running
on
Zero
import bpy, os | |
from collections import defaultdict | |
from tqdm import tqdm | |
import numpy as np | |
from numpy import ndarray | |
from typing import Dict, Tuple, List, Optional, Union | |
import trimesh | |
import fast_simplification | |
from scipy.spatial import KDTree | |
import argparse | |
import yaml | |
from box import Box | |
import os | |
from .log import new_entry, add_error, add_warning, new_log, end_log | |
from .raw_data import RawData | |
def load(filepath: str): | |
old_objs = set(bpy.context.scene.objects) | |
if not os.path.exists(filepath): | |
raise ValueError(f'File {filepath} does not exist !') | |
try: | |
if filepath.endswith(".vrm"): | |
# enable vrm addon and load vrm model | |
bpy.ops.preferences.addon_enable(module='vrm') | |
bpy.ops.import_scene.vrm( | |
filepath=filepath, | |
use_addon_preferences=True, | |
extract_textures_into_folder=False, | |
make_new_texture_folder=False, | |
set_shading_type_to_material_on_import=False, | |
set_view_transform_to_standard_on_import=True, | |
set_armature_display_to_wire=True, | |
set_armature_display_to_show_in_front=True, | |
set_armature_bone_shape_to_default=True, | |
disable_bake=True, # customized option for better performance | |
) | |
elif filepath.endswith(".obj"): | |
bpy.ops.wm.obj_import(filepath=filepath) | |
elif filepath.endswith(".fbx") or filepath.endswith(".FBX"): | |
# end bone is removed using remove_dummy_bone | |
bpy.ops.import_scene.fbx(filepath=filepath, ignore_leaf_bones=False, use_image_search=False) | |
elif filepath.endswith(".glb") or filepath.endswith(".gltf"): | |
bpy.ops.import_scene.gltf(filepath=filepath, import_pack_images=False) | |
elif filepath.endswith(".dae"): | |
bpy.ops.wm.collada_import(filepath=filepath) | |
elif filepath.endswith(".blend"): | |
with bpy.data.libraries.load(filepath) as (data_from, data_to): | |
data_to.objects = data_from.objects | |
for obj in data_to.objects: | |
if obj is not None: | |
bpy.context.collection.objects.link(obj) | |
else: | |
raise ValueError(f"not suported type {filepath}") | |
except: | |
raise ValueError(f"failed to load {filepath}") | |
armature = [x for x in set(bpy.context.scene.objects)-old_objs if x.type=="ARMATURE"] | |
if len(armature)==0: | |
return None | |
if len(armature)>1: | |
raise ValueError(f"multiple armatures found") | |
armature = armature[0] | |
armature.select_set(True) | |
bpy.context.view_layer.objects.active = armature | |
bpy.ops.object.mode_set(mode='EDIT') | |
for bone in bpy.data.armatures[0].edit_bones: | |
bone.roll = 0. # change all roll to 0. to prevent weird behaviour | |
bpy.ops.object.mode_set(mode='OBJECT') | |
armature.select_set(False) | |
bpy.ops.object.select_all(action='DESELECT') | |
return armature | |
# remove all data in bpy | |
def clean_bpy(): | |
# First try to purge orphan data | |
try: | |
bpy.ops.outliner.orphans_purge(do_local_ids=True, do_linked_ids=True, do_recursive=True) | |
except Exception as e: | |
print(f"Warning: Could not purge orphans: {e}") | |
# Then remove all data by type | |
data_types = [ | |
bpy.data.actions, | |
bpy.data.armatures, | |
bpy.data.cameras, | |
bpy.data.collections, | |
bpy.data.curves, | |
bpy.data.images, | |
bpy.data.lights, | |
bpy.data.materials, | |
bpy.data.meshes, | |
bpy.data.objects, | |
bpy.data.textures, | |
bpy.data.worlds, | |
bpy.data.node_groups | |
] | |
for data_collection in data_types: | |
try: | |
for item in data_collection: | |
try: | |
data_collection.remove(item) | |
except Exception as e: | |
print(f"Warning: Could not remove {item.name} from {data_collection}: {e}") | |
except Exception as e: | |
print(f"Warning: Error processing {data_collection}: {e}") | |
# Force garbage collection to free memory | |
import gc | |
gc.collect() | |
def get_arranged_bones(armature): | |
matrix_world = armature.matrix_world | |
arranged_bones = [] | |
root = armature.pose.bones[0] | |
while root.parent is not None: | |
root = root.parent | |
Q = [root] | |
rot = np.array(matrix_world)[:3, :3] | |
# dfs and sort | |
while len(Q) != 0: | |
b = Q.pop(0) | |
arranged_bones.append(b) | |
children = [] | |
for cb in b.children: | |
head = rot @ np.array(b.head) | |
children.append((cb, head[0], head[1], head[2])) | |
children = sorted(children, key=lambda x: (x[3], x[1], x[2])) | |
_c = [x[0] for x in children] | |
Q = _c + Q | |
return arranged_bones | |
def process_mesh(): | |
meshes = [] | |
for v in bpy.data.objects: | |
if v.type == 'MESH': | |
meshes.append(v) | |
_dict_mesh = {} | |
for obj in meshes: | |
m = np.array(obj.matrix_world) | |
matrix_world_rot = m[:3, :3] | |
matrix_world_bias = m[:3, 3] | |
rot = matrix_world_rot | |
total_vertices = len(obj.data.vertices) | |
vertex = np.zeros((4, total_vertices)) | |
vertex_normal = np.zeros((total_vertices, 3)) | |
obj_verts = obj.data.vertices | |
faces = [] | |
normals = [] | |
for v in obj_verts: | |
vertex_normal[v.index] = rot @ np.array(v.normal) # be careful ! | |
vv = rot @ v.co | |
vv = np.array(vv) + matrix_world_bias | |
vertex[0:3, v.index] = vv | |
vertex[3][v.index] = 1 # affine coordinate | |
for polygon in obj.data.polygons: | |
edges = polygon.edge_keys | |
nodes = [] | |
adj = {} | |
for edge in edges: | |
if adj.get(edge[0]) is None: | |
adj[edge[0]] = [] | |
adj[edge[0]].append(edge[1]) | |
if adj.get(edge[1]) is None: | |
adj[edge[1]] = [] | |
adj[edge[1]].append(edge[0]) | |
nodes.append(edge[0]) | |
nodes.append(edge[1]) | |
normal = polygon.normal | |
nodes = list(set(sorted(nodes))) | |
first = nodes[0] | |
loop = [] | |
now = first | |
vis = {} | |
while True: | |
loop.append(now) | |
vis[now] = True | |
if vis.get(adj[now][0]) is None: | |
now = adj[now][0] | |
elif vis.get(adj[now][1]) is None: | |
now = adj[now][1] | |
else: | |
break | |
for (second, third) in zip(loop[1:], loop[2:]): | |
faces.append((first + 1, second + 1, third + 1)) # the cursed +1 | |
normals.append(rot @ normal) # and the cursed normal of BLENDER | |
correct_faces = [] | |
for (i, face) in enumerate(faces): | |
normal = normals[i] | |
v0 = face[0] - 1 | |
v1 = face[1] - 1 | |
v2 = face[2] - 1 | |
v = np.cross( | |
vertex[:3, v1] - vertex[:3, v0], | |
vertex[:3, v2] - vertex[:3, v0], | |
) | |
if (v*normal).sum() > 0: | |
correct_faces.append(face) | |
else: | |
correct_faces.append((face[0], face[2], face[1])) | |
if len(correct_faces) > 0: | |
_dict_mesh[obj.name] = { | |
'vertex': vertex, | |
'face': correct_faces, | |
} | |
vertex = np.concatenate([_dict_mesh[name]['vertex'] for name in _dict_mesh], axis=1)[:3, :].transpose() | |
total_faces = 0 | |
now_bias = 0 | |
for name in _dict_mesh: | |
total_faces += len(_dict_mesh[name]['face']) | |
faces = np.zeros((total_faces, 3), dtype=np.int64) | |
tot = 0 | |
for name in _dict_mesh: | |
f = np.array(_dict_mesh[name]['face'], dtype=np.int64) | |
faces[tot:tot+f.shape[0]] = f + now_bias | |
now_bias += _dict_mesh[name]['vertex'].shape[1] | |
tot += f.shape[0] | |
return vertex, faces | |
def process_armature( | |
armature, | |
arranged_bones, | |
) -> Tuple[np.ndarray, np.ndarray]: | |
matrix_world = armature.matrix_world | |
index = {} | |
for (id, pbone) in enumerate(arranged_bones): | |
index[pbone.name] = id | |
root = armature.pose.bones[0] | |
while root.parent is not None: | |
root = root.parent | |
m = np.array(matrix_world.to_4x4()) | |
scale_inv = np.linalg.inv(np.diag(matrix_world.to_scale())) | |
rot = m[:3, :3] | |
bias = m[:3, 3] | |
s = [] | |
bpy.ops.object.editmode_toggle() | |
edit_bones = armature.data.edit_bones | |
J = len(arranged_bones) | |
joints = np.zeros((J, 3), dtype=np.float32) | |
tails = np.zeros((J, 3), dtype=np.float32) | |
parents = [] | |
name_to_id = {} | |
names = [] | |
matrix_local_stack = np.zeros((J, 4, 4), dtype=np.float32) | |
for (id, pbone) in enumerate(arranged_bones): | |
name = pbone.name | |
names.append(name) | |
matrix_local = np.array(pbone.bone.matrix_local) | |
use_inherit_rotation = pbone.bone.use_inherit_rotation | |
if use_inherit_rotation == False: | |
add_warning(f"use_inherit_rotation of bone {name} is False !") | |
head = rot @ matrix_local[0:3, 3] + bias | |
s.append(head) | |
edit_bone = edit_bones.get(name) | |
tail = rot @ np.array(edit_bone.tail) + bias | |
name_to_id[name] = id | |
joints[id] = head | |
tails[id] = tail | |
parents.append(None if pbone.parent not in arranged_bones else name_to_id[pbone.parent.name]) | |
# remove scale part | |
matrix_local[:, 3:4] = m @ matrix_local[:, 3:4] | |
matrix_local[:3, :3] = scale_inv @ matrix_local[:3, :3] | |
matrix_local_stack[id] = matrix_local | |
bpy.ops.object.editmode_toggle() | |
return joints, tails, parents, names, matrix_local_stack | |
def save_raw_data( | |
path: str, | |
vertices: ndarray, | |
faces: ndarray, | |
joints: Union[ndarray, None], | |
tails: Union[ndarray, None], | |
parents: Union[List[Union[int, None]], None], | |
names: Union[List[str], None], | |
matrix_local: Union[ndarray, None], | |
target_count: int, | |
): | |
mesh = trimesh.Trimesh(vertices=vertices, faces=faces) | |
vertices = np.array(mesh.vertices, dtype=np.float32) | |
faces = np.array(mesh.faces, dtype=np.int64) | |
if faces.shape[0] > target_count: | |
vertices, faces = fast_simplification.simplify(vertices, faces, target_count=target_count) | |
mesh = trimesh.Trimesh(vertices=vertices, faces=faces) | |
new_vertices = np.array(mesh.vertices, dtype=np.float32) | |
new_vertex_normals = np.array(mesh.vertex_normals, dtype=np.float32) | |
new_faces = np.array(mesh.faces, dtype=np.int64) | |
new_face_normals = np.array(mesh.face_normals, dtype=np.float32) | |
if joints is not None: | |
new_joints = np.array(joints, dtype=np.float32) | |
else: | |
new_joints = None | |
raw_data = RawData( | |
vertices=new_vertices, | |
vertex_normals=new_vertex_normals, | |
faces=new_faces, | |
face_normals=new_face_normals, | |
joints=new_joints, | |
tails=tails, | |
skin=None, | |
no_skin=None, | |
parents=parents, | |
names=names, | |
matrix_local=matrix_local, | |
) | |
raw_data.check() | |
raw_data.save(path=path) | |
def extract_builtin( | |
output_folder: str, | |
target_count: int, | |
num_runs: int, | |
id: int, | |
time: str, | |
files: List[Union[str, str]], | |
): | |
log_path = "./logs" | |
log_path = os.path.join(log_path, time) | |
num_files = len(files) | |
gap = num_files // num_runs | |
start = gap * id | |
end = gap * (id + 1) | |
if id+1==num_runs: | |
end = num_files | |
files = sorted(files) | |
if end!=-1: | |
files = files[:end] | |
new_log(log_path, f"extract_builtin_{start}_{end}") | |
tot = 0 | |
for file in tqdm(files[start:]): | |
input_file = file[0] | |
output_dir = file[1] | |
clean_bpy() | |
new_entry(input_file) | |
try: | |
print(f"Now processing {input_file}...") | |
armature = load(input_file) | |
print('save to:', output_dir) | |
os.makedirs(output_dir, exist_ok=True) | |
vertices, faces = process_mesh() | |
if armature is not None: | |
arranged_bones = get_arranged_bones(armature) | |
joints, tails, parents, names, matrix_local = process_armature(armature, arranged_bones) | |
else: | |
joints = None | |
tails = None | |
parents = None | |
names = None | |
matrix_local = None | |
save_file = os.path.join(output_dir, 'raw_data.npz') | |
save_raw_data( | |
path=save_file, | |
vertices=vertices, | |
faces=faces-1, | |
joints=joints, | |
tails=tails, | |
parents=parents, | |
names=names, | |
matrix_local=matrix_local, | |
target_count=target_count, | |
) | |
tot += 1 | |
except ValueError as e: | |
add_error(str(e)) | |
print(f"ValueError: {str(e)}") | |
except RuntimeError as e: | |
add_error(str(e)) | |
print(f"RuntimeError: {str(e)}") | |
except TimeoutError as e: | |
add_error("time out") | |
print("TimeoutError: Processing timed out") | |
except Exception as e: | |
add_error(f"Unexpected error: {str(e)}") | |
print(f"Unexpected error: {str(e)}") | |
end_log() | |
print(f"{tot} models processed") | |
def str2bool(v): | |
if isinstance(v, bool): | |
return v | |
if v.lower() in ('yes', 'true', 't', 'y', '1'): | |
return True | |
elif v.lower() in ('no', 'false', 'f', 'n', '0'): | |
return False | |
else: | |
raise argparse.ArgumentTypeError('Boolean value expected.') | |
def nullable_string(val): | |
if not val: | |
return None | |
return val | |
def get_files( | |
data_name: str, | |
input_dataset_dir: str, | |
output_dataset_dir: str, | |
inputs: Union[str, None]=None, | |
require_suffix: List[str]=['obj','fbx','FBX','dae','glb','gltf','vrm'], | |
force_override: bool=False, | |
warning: bool=True, | |
) -> List[Tuple[str, str]]: | |
files = [] # (input_file, output_dir) | |
if inputs is not None: # specified input file(s) | |
vis = {} | |
inputs = inputs.split(',') | |
for file in inputs: | |
file_name = file.removeprefix("./") | |
# remove suffix | |
file_name = '.'.join(file_name.split('.')[:-1]) | |
output_dir = os.path.join(output_dataset_dir, file_name) | |
raw_data_npz = os.path.join(output_dir, data_name) | |
if not force_override and os.path.exists(raw_data_npz): | |
continue | |
if warning and output_dir in vis: | |
print(f"\033[33mWARNING: duplicate output directory: {output_dir}, you need to rename prefix of files to avoid ambiguity\033[0m") | |
vis[output_dir] = True | |
files.append((file, output_dir)) | |
else: | |
vis = {} | |
for root, dirs, f in os.walk(input_dataset_dir): | |
for file in f: | |
if file.split('.')[-1] in require_suffix: | |
file_name = file.removeprefix("./") | |
# remove suffix | |
file_name = '.'.join(file_name.split('.')[:-1]) | |
output_dir = os.path.join(output_dataset_dir, os.path.relpath(root, input_dataset_dir), file_name) | |
raw_data_npz = os.path.join(output_dir, data_name) | |
# Check if all required files exist | |
if not force_override and os.path.exists(raw_data_npz): | |
continue | |
if warning and output_dir in vis: | |
print(f"\033[33mWARNING: duplicate output directory: {output_dir}, you need to rename prefix of files to avoid ambiguity\033[0m") | |
vis[output_dir] = True | |
files.append((os.path.join(root, file), output_dir)) | |
return files | |
def parse(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--config', type=str, required=True) | |
parser.add_argument('--require_suffix', type=str, required=True) | |
parser.add_argument('--faces_target_count', type=int, required=True) | |
parser.add_argument('--num_runs', type=int, required=True) | |
parser.add_argument('--force_override', type=str2bool, required=True) | |
parser.add_argument('--id', type=int, required=True) | |
parser.add_argument('--time', type=str, required=True) | |
parser.add_argument('--input', type=nullable_string, required=False, default=None) | |
parser.add_argument('--input_dir', type=nullable_string, required=False, default=None) | |
parser.add_argument('--output_dir', type=nullable_string, required=False, default=None) | |
return parser.parse_args() | |
if __name__ == "__main__": | |
args = parse() | |
config = Box(yaml.safe_load(open(args.config, "r"))) | |
num_runs = args.num_runs | |
id = args.id | |
timestamp = args.time | |
require_suffix = args.require_suffix.split(',') | |
force_override = args.force_override | |
target_count = args.faces_target_count | |
if args.input_dir: | |
config.input_dataset_dir = args.input_dir | |
if args.output_dir: | |
config.output_dataset_dir = args.output_dir | |
assert config.input_dataset_dir is not None or args.input is None, 'you cannot specify both input and input_dir' | |
files = get_files( | |
data_name='raw_data.npz', | |
inputs=args.input, | |
input_dataset_dir=config.input_dataset_dir, | |
output_dataset_dir=config.output_dataset_dir, | |
require_suffix=require_suffix, | |
force_override=force_override, | |
warning=True, | |
) | |
extract_builtin( | |
output_folder=config.output_dataset_dir, | |
target_count=target_count, | |
num_runs=num_runs, | |
id=id, | |
time=timestamp, | |
files=files, | |
) |