fciannella's picture
Working with service run on 7860
53ea588
raw
history blame
3.53 kB
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: BSD 2-Clause License
"""Example bot demonstrating how to use the tracing utilities."""
import asyncio
import logging
import uuid
from fastapi import FastAPI
from opentelemetry import metrics, trace
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameProcessor
from nvidia_pipecat.utils.tracing import AttachmentStrategy, traceable, traced
app = FastAPI()
tracer = trace.get_tracer("opentelemetry-pipecat-example")
meter = metrics.get_meter("opentelemetry-pipecat-example")
logger = logging.getLogger("opentelemetry")
logger.setLevel(logging.DEBUG)
@traceable
class DummyProcessor(FrameProcessor):
"""Example processor demonstrating how to use the tracing utilities."""
@traced(attachment_strategy=AttachmentStrategy.NONE)
async def process_frame(self, frame, direction):
"""Process a frame."""
await super().process_frame(frame, direction)
trace.get_current_span().add_event("Before inner")
with tracer.start_as_current_span("inner") as span:
span.add_event("inner event")
await self.child()
await self.linked()
await self.none()
trace.get_current_span().add_event("After inner")
async for f in self.generator():
print(f"{f}")
await super().push_frame(frame, direction)
@traced
async def child(self):
"""Example method for the DummyProcessor."""
# This span is attached as CHILD meaning that it will
# be attached to the class span if no parent or to its
# parent otherwise.
trace.get_current_span().add_event("child")
@traced(attachment_strategy=AttachmentStrategy.LINK)
async def linked(self):
"""Example method for the DummyProcessor."""
# This span is attached as LINK meaning it will be attached
# to the class span but linked to its parent.
trace.get_current_span().add_event("linked")
@traced(attachment_strategy=AttachmentStrategy.NONE)
async def none(self):
"""Example method for the DummyProcessor."""
# This span is attached as NONE meaning it will be attached
# to the class span even if nested under another span.
trace.get_current_span().add_event("none")
@traced
async def generator(self):
"""Example method for the DummyProcessor."""
yield TextFrame("Hello, ")
trace.get_current_span().add_event("generated!")
yield TextFrame("World")
async def main():
"""Main function of the bot."""
with tracer.start_as_current_span("pipeline-root-span") as span:
span.set_attribute("stream_id", str(uuid.uuid4()))
logger.info("Started building pipeline")
dummy = DummyProcessor()
logger.info("Built dummy processor")
pipeline = Pipeline([dummy])
task = PipelineTask(pipeline)
await task.queue_frame(TextFrame("Hello, "))
await task.queue_frame(TextFrame("World"))
await task.stop_when_done()
logger.info("Built pipeline task")
logger.info("Starting pipeline...")
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())