File size: 3,532 Bytes
53ea588
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: BSD 2-Clause License

"""Example bot demonstrating how to use the tracing utilities."""

import asyncio
import logging
import uuid

from fastapi import FastAPI
from opentelemetry import metrics, trace
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameProcessor

from nvidia_pipecat.utils.tracing import AttachmentStrategy, traceable, traced

app = FastAPI()

tracer = trace.get_tracer("opentelemetry-pipecat-example")

meter = metrics.get_meter("opentelemetry-pipecat-example")

logger = logging.getLogger("opentelemetry")
logger.setLevel(logging.DEBUG)


@traceable
class DummyProcessor(FrameProcessor):
    """Example processor demonstrating how to use the tracing utilities."""

    @traced(attachment_strategy=AttachmentStrategy.NONE)
    async def process_frame(self, frame, direction):
        """Process a frame."""
        await super().process_frame(frame, direction)
        trace.get_current_span().add_event("Before inner")
        with tracer.start_as_current_span("inner") as span:
            span.add_event("inner event")
            await self.child()
            await self.linked()
            await self.none()
        trace.get_current_span().add_event("After inner")
        async for f in self.generator():
            print(f"{f}")
        await super().push_frame(frame, direction)

    @traced
    async def child(self):
        """Example method for the DummyProcessor."""
        # This span is attached as CHILD meaning that it will
        # be attached to the class span if no parent or to its
        # parent otherwise.
        trace.get_current_span().add_event("child")

    @traced(attachment_strategy=AttachmentStrategy.LINK)
    async def linked(self):
        """Example method for the DummyProcessor."""
        # This span is attached as LINK meaning it will be attached
        # to the class span but linked to its parent.
        trace.get_current_span().add_event("linked")

    @traced(attachment_strategy=AttachmentStrategy.NONE)
    async def none(self):
        """Example method for the DummyProcessor."""
        # This span is attached as NONE meaning it will be attached
        # to the class span even if nested under another span.
        trace.get_current_span().add_event("none")

    @traced
    async def generator(self):
        """Example method for the DummyProcessor."""
        yield TextFrame("Hello, ")
        trace.get_current_span().add_event("generated!")
        yield TextFrame("World")


async def main():
    """Main function of the bot."""
    with tracer.start_as_current_span("pipeline-root-span") as span:
        span.set_attribute("stream_id", str(uuid.uuid4()))
        logger.info("Started building pipeline")
        dummy = DummyProcessor()
        logger.info("Built dummy processor")
        pipeline = Pipeline([dummy])
        task = PipelineTask(pipeline)
        await task.queue_frame(TextFrame("Hello, "))
        await task.queue_frame(TextFrame("World"))
        await task.stop_when_done()
        logger.info("Built pipeline task")
        logger.info("Starting pipeline...")
        runner = PipelineRunner(handle_sigint=False)
        await runner.run(task)


if __name__ == "__main__":
    asyncio.run(main())