Spaces:
Runtime error
Runtime error
File size: 2,942 Bytes
63deadc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
from inspect import signature
from typing import List, Optional, Sequence, Union
from langchain_core.callbacks.manager import Callbacks
from langchain_core.documents import (
BaseDocumentCompressor,
BaseDocumentTransformer,
Document,
)
class DocumentCompressorPipeline(BaseDocumentCompressor):
"""Document compressor that uses a pipeline of Transformers."""
transformers: List[Union[BaseDocumentTransformer, BaseDocumentCompressor]]
"""List of document filters that are chained together and run in sequence."""
class Config:
"""Configuration for this pydantic object."""
arbitrary_types_allowed = True
def compress_documents(
self,
documents: Sequence[Document],
query: str,
callbacks: Optional[Callbacks] = None,
) -> Sequence[Document]:
"""Transform a list of documents."""
for _transformer in self.transformers:
if isinstance(_transformer, BaseDocumentCompressor):
accepts_callbacks = (
signature(_transformer.compress_documents).parameters.get(
"callbacks"
)
is not None
)
if accepts_callbacks:
documents = _transformer.compress_documents(
documents, query, callbacks=callbacks
)
else:
documents = _transformer.compress_documents(documents, query)
elif isinstance(_transformer, BaseDocumentTransformer):
documents = _transformer.transform_documents(documents)
else:
raise ValueError(f"Got unexpected transformer type: {_transformer}")
return documents
async def acompress_documents(
self,
documents: Sequence[Document],
query: str,
callbacks: Optional[Callbacks] = None,
) -> Sequence[Document]:
"""Compress retrieved documents given the query context."""
for _transformer in self.transformers:
if isinstance(_transformer, BaseDocumentCompressor):
accepts_callbacks = (
signature(_transformer.acompress_documents).parameters.get(
"callbacks"
)
is not None
)
if accepts_callbacks:
documents = await _transformer.acompress_documents(
documents, query, callbacks=callbacks
)
else:
documents = await _transformer.acompress_documents(documents, query)
elif isinstance(_transformer, BaseDocumentTransformer):
documents = await _transformer.atransform_documents(documents)
else:
raise ValueError(f"Got unexpected transformer type: {_transformer}")
return documents
|