GitHub Actions
Deploy to Hugging Face Space: product-image-update-port-10
18faf97
# ----------------------------------------------------------------------
# IMPORTS
# ----------------------------------------------------------------------
import io
import sys
import time
import logging
from contextlib import redirect_stdout
from typing import List, Callable
from src.utils import ProcessingContext
# ----------------------------------------------------------------------
# PIPELINE EXECUTION
# ----------------------------------------------------------------------
def run_functions_in_sequence(contexts: List[ProcessingContext], pipeline_steps: List[Callable]) -> None:
for func in pipeline_steps:
if all(context.skip_run or context.skip_processing for context in contexts):
break
logging.info(f"Executing pipeline step: {func.__name__}")
stdout_buffer = io.StringIO()
with redirect_stdout(stdout_buffer):
func(contexts)
sys.stdout.write(stdout_buffer.getvalue())
sys.stdout.flush()
time.sleep(1)