# ---------------------------------------------------------------------- | |
# IMPORTS | |
# ---------------------------------------------------------------------- | |
import io | |
import sys | |
import time | |
import logging | |
from contextlib import redirect_stdout | |
from typing import List, Callable | |
from src.utils import ProcessingContext | |
# ---------------------------------------------------------------------- | |
# PIPELINE EXECUTION | |
# ---------------------------------------------------------------------- | |
def run_functions_in_sequence(contexts: List[ProcessingContext], pipeline_steps: List[Callable]) -> None: | |
for func in pipeline_steps: | |
if all(context.skip_run or context.skip_processing for context in contexts): | |
break | |
logging.info(f"Executing pipeline step: {func.__name__}") | |
stdout_buffer = io.StringIO() | |
with redirect_stdout(stdout_buffer): | |
func(contexts) | |
sys.stdout.write(stdout_buffer.getvalue()) | |
sys.stdout.flush() | |
time.sleep(1) | |