initial commit

rebranding everything
This commit is contained in:
Kenneth Estanislao
2023-09-24 21:36:57 +08:00
parent f522c4ea89
commit e616245e3d
26 changed files with 2351 additions and 2 deletions

View File

View File

View File

@@ -0,0 +1,72 @@
import sys
import importlib
from concurrent.futures import ThreadPoolExecutor
from types import ModuleType
from typing import Any, List, Callable
from tqdm import tqdm
import modules
import modules.globals
FRAME_PROCESSORS_MODULES: List[ModuleType] = []
FRAME_PROCESSORS_INTERFACE = [
'pre_check',
'pre_start',
'process_frame',
'process_image',
'process_video'
]
def load_frame_processor_module(frame_processor: str) -> Any:
try:
frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}')
for method_name in FRAME_PROCESSORS_INTERFACE:
if not hasattr(frame_processor_module, method_name):
sys.exit()
except ImportError:
sys.exit()
return frame_processor_module
def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType]:
global FRAME_PROCESSORS_MODULES
if not FRAME_PROCESSORS_MODULES:
for frame_processor in frame_processors:
frame_processor_module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.append(frame_processor_module)
set_frame_processors_modules_from_ui(frame_processors)
return FRAME_PROCESSORS_MODULES
def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None:
global FRAME_PROCESSORS_MODULES
for frame_processor, state in modules.globals.fp_ui.items():
if state == True and frame_processor not in frame_processors:
frame_processor_module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.append(frame_processor_module)
modules.globals.frame_processors.append(frame_processor)
if state == False:
frame_processor_module = load_frame_processor_module(frame_processor)
try:
FRAME_PROCESSORS_MODULES.remove(frame_processor_module)
modules.globals.frame_processors.remove(frame_processor)
except:
pass
def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None:
with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor:
futures = []
for path in temp_frame_paths:
future = executor.submit(process_frames, source_path, [path], progress)
futures.append(future)
for future in futures:
future.result()
def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None:
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
total = len(frame_paths)
with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress:
progress.set_postfix({'execution_providers': modules.globals.execution_providers, 'execution_threads': modules.globals.execution_threads, 'max_memory': modules.globals.max_memory})
multi_process_frame(source_path, frame_paths, process_frames, progress)

View File

@@ -0,0 +1,75 @@
from typing import Any, List
import cv2
import threading
import gfpgan
import modules.globals
import modules.processors.frame.core
from modules.core import update_status
from modules.face_analyser import get_one_face
from modules.typing import Frame, Face
from modules.utilities import conditional_download, resolve_relative_path, is_image, is_video
FACE_ENHANCER = None
THREAD_SEMAPHORE = threading.Semaphore()
THREAD_LOCK = threading.Lock()
NAME = 'DLC.FACE-ENHANCER'
def pre_check() -> bool:
download_directory_path = resolve_relative_path('..\models')
conditional_download(download_directory_path, ['https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/GFPGANv1.4.pth'])
return True
def pre_start() -> bool:
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
update_status('Select an image or video for target path.', NAME)
return False
return True
def get_face_enhancer() -> Any:
global FACE_ENHANCER
with THREAD_LOCK:
if FACE_ENHANCER is None:
model_path = resolve_relative_path('..\models\GFPGANv1.4.pth')
# todo: set models path https://github.com/TencentARC/GFPGAN/issues/399
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined]
return FACE_ENHANCER
def enhance_face(temp_frame: Frame) -> Frame:
with THREAD_SEMAPHORE:
_, _, temp_frame = get_face_enhancer().enhance(
temp_frame,
paste_back=True
)
return temp_frame
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
target_face = get_one_face(temp_frame)
if target_face:
temp_frame = enhance_face(temp_frame)
return temp_frame
def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None:
for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path)
result = process_frame(None, temp_frame)
cv2.imwrite(temp_frame_path, result)
if progress:
progress.update(1)
def process_image(source_path: str, target_path: str, output_path: str) -> None:
target_frame = cv2.imread(target_path)
result = process_frame(None, target_frame)
cv2.imwrite(output_path, result)
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames)

View File

@@ -0,0 +1,86 @@
from typing import Any, List
import cv2
import insightface
import threading
import modules.globals
import modules.processors.frame.core
from modules.core import update_status
from modules.face_analyser import get_one_face, get_many_faces
from modules.typing import Face, Frame
from modules.utilities import conditional_download, resolve_relative_path, is_image, is_video
FACE_SWAPPER = None
THREAD_LOCK = threading.Lock()
NAME = 'DLC.FACE-SWAPPER'
def pre_check() -> bool:
download_directory_path = resolve_relative_path('../models')
conditional_download(download_directory_path, ['https://github.com/facefusion/facefusion-assets/releases/download/models/inswapper_128.onnx'])
return True
def pre_start() -> bool:
if not is_image(modules.globals.source_path):
update_status('Select an image for source path.', NAME)
return False
elif not get_one_face(cv2.imread(modules.globals.source_path)):
update_status('No face in source path detected.', NAME)
return False
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
update_status('Select an image or video for target path.', NAME)
return False
return True
def get_face_swapper() -> Any:
global FACE_SWAPPER
with THREAD_LOCK:
if FACE_SWAPPER is None:
model_path = resolve_relative_path('../models/inswapper_128.onnx')
FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=modules.globals.execution_providers)
return FACE_SWAPPER
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
return get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True)
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
if modules.globals.many_faces:
many_faces = get_many_faces(temp_frame)
if many_faces:
for target_face in many_faces:
temp_frame = swap_face(source_face, target_face, temp_frame)
else:
target_face = get_one_face(temp_frame)
if target_face:
temp_frame = swap_face(source_face, target_face, temp_frame)
return temp_frame
def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None:
source_face = get_one_face(cv2.imread(source_path))
for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path)
try:
result = process_frame(source_face, temp_frame)
cv2.imwrite(temp_frame_path, result)
except Exception as exception:
print(exception)
pass
if progress:
progress.update(1)
def process_image(source_path: str, target_path: str, output_path: str) -> None:
source_face = get_one_face(cv2.imread(source_path))
target_frame = cv2.imread(target_path)
result = process_frame(source_face, target_frame)
cv2.imwrite(output_path, result)
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames)