自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

線上RAG應用pdf文檔頻繁更新,老板下了死命令要節(jié)省預算,不能重復做embedding,我這么做..... 原創(chuàng)

發(fā)布于 2024-12-3 08:51
瀏覽
0收藏

我們最近在一個項目中遇到了一個問題。項目的場景是這樣的:用戶將他們的PDF文檔存儲在磁盤的某個特定目錄中,然后有一個定時任務來掃描此目錄并從中的PDF文檔構建知識庫。

一開始,我們采用"增量更新"策略。在掃描目錄中的文檔時,我們會對每個文檔進行哈希運算以生成其指紋,并檢查該指紋是否已存在于數(shù)據(jù)庫中。如果指紋不存在,就表示這是一個新文件,我們會對新文件的document做embedding,然后將其加入到知識庫中。

然而,這種方法存在一個問題。如果同一文件進行了增量添加,例如我們已經將A.pdf文件加入到了知識庫,但后來這個文件添加了新的內容。當我們重新計算其指紋并在數(shù)據(jù)庫中查找時,由于指紋不存在,我們會將這個更新過的文件作為新文件處理,并重新做embedding加入到知識庫。這樣一來,對于未更新的部分,知識庫會有兩份相同的數(shù)據(jù)記錄,第二份相同的記錄可能會"占據(jù)"原本應該被召回的數(shù)據(jù)記錄的位置,從而降低問答效果。

那么應該怎么解決這個問題呢?對于增量更新,做hash指紋這一點毋庸置疑,但是hash的對象不能是文件了,而應該聚焦于真實存到知識庫的數(shù)據(jù): document.

在這里,我們將查看使用LangChain index API的基本索引工作流。

index API允許您將來自任何源的文檔加載到矢量存儲中并保持同步。具體來說,它有助于:

  • 避免將重復的內容寫入vector存儲
  • 避免重寫未更改的內容
  • 避免在未更改的內容上重新計算embedding

所有這些都可以節(jié)省你的時間和金錢,并改善你的矢量搜索結果。

如何工作

LangChain索引使用記錄管理器(RecordManager)來跟蹤寫入矢量存儲的文檔。

當索引內容時,為每個文檔計算哈希值,并將以下信息存儲在記錄管理器中:

  • 文檔hash(頁面內容和元數(shù)據(jù)的散列)
  • 寫時間
  • 源id——每個文檔應該在其元數(shù)據(jù)中包含信息,以便我們確定該文檔的最終來源

刪除模式

將文檔索引到矢量存儲時,可能會刪除矢量存儲中的一些現(xiàn)有文檔。在某些情況下,您可能希望刪除與正在索引的新文檔來自相同來源的所有現(xiàn)有文檔。在其他情況下,您可能希望批量刪除所有現(xiàn)有文檔。索引API刪除模式可以讓你選擇你想要的行為:

Cleanup Mode

De-Duplicates Content

Parallelizable

Cleans Up Deleted Source Docs

Cleans Up Mutations of Source Docs and/or Derived Docs

Clean Up Timing

None

?

?

?

?

-

Incremental

?

?

?

?

Continuously

Full

?

?

?

?

At end of indexing

快速開始

首先,需要明確的是,無論使用何種清理模式,index函數(shù)都會自動去重。也就是說,調用index([doc1, doc1, doc2])的效果等同于調用index([doc1, doc2])。然而,在我們的實際應用場景中,情況并不完全如此。

可能在第一次運行時,我們對[doc1, doc2]進行了索引操作,而在下次定時任務執(zhí)行時,我們又對[doc1, doc3]進行了索引。換言之,我們從源文檔中刪除了一部分內容,并添加了一些新的內容。這才是我們真正面臨的場景:我們希望保持doc1不變,新增doc3,并能夠自動刪除doc2。這種需求可以通過Incremental增量模式得到滿足。

話不多說,我們來看看三種模式的使用效果吧。

None

None模式的功能可以理解為去重和添加,而不包括刪除。例如,如果你首次調用index([doc1, doc2]),然后再次調用index([doc1, doc3]),那么在向量庫中的數(shù)據(jù)就會是[doc1, doc2, doc3]。需要注意的是,這種模式下,舊版本的doc2并不會被刪除。

from langchain.embeddings import OpenAIEmbeddings
from langchain.schema import Document
from langchain.vectorstores.elasticsearch import ElasticsearchStore
from langchain.indexes import SQLRecordManager, index


collection_name = "test_index"

embedding = OpenAIEmbeddings()

vectorstore = ElasticsearchStore(
    es_url="http://localhost:9200",
    index_name="test_index",
    embedding=embedding)

namespace = f"elasticsearch/{collection_name}"
record_manager = SQLRecordManager(
    namespace, db_url="sqlite:///record_manager_cache.sql"
)
# record_manager.create_schema()

doc1 = Document(page_content="kitty", metadata={"source": "kitty.txt"})
doc2 = Document(page_content="doggy", metadata={"source": "doggy.txt"})
doc3 = Document(page_content="doggy1", metadata={"source": "doggy.txt"})


def _clear():
    """Hacky helper method to clear content. See the `full` mode section to to understand why it works."""
    index(
        [],
        record_manager,
        vectorstore,
        cleanup="full",
        source_id_key="source")

_clear()

res = index(
    [doc1, doc1, doc2],
    record_manager,
    vectorstore,
    cleanup=None,
    source_id_key="source",
)
print(res)

得到的結果:

{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}

我們發(fā)現(xiàn)做了去重并且?guī)臀覀冊黾恿藘蓷l數(shù)據(jù)。

然后我們再執(zhí)行index操作:

res = index(
    [doc1, doc3],
    record_manager,
    vectorstore,
    cleanup=None,
    source_id_key="source",
)
print(res)

執(zhí)行結果發(fā)現(xiàn)添加了doc3, 跳過了doc1, doc2 還在數(shù)據(jù)庫記錄里:

{'num_added': 1, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 0}

線上RAG應用pdf文檔頻繁更新,老板下了死命令要節(jié)省預算,不能重復做embedding,我這么做.....-AI.x社區(qū)


full

full 含義是用戶應該將所有需要進行索引的全部內容傳遞給index函數(shù),任何沒有傳遞到索引函數(shù)并且存在于vectorstore中的文檔將被刪除! 此行為對于處理源文檔的刪除非常有用。我們還是使用上面的代碼,這次只是把模式換成 full. 首先,我們需要重置并清空數(shù)據(jù),這可以通過調用??_clear()??函數(shù)實現(xiàn)。

res = index(
    [doc1, doc1, doc2],
    record_manager,
    vectorstore,
    cleanup="full",
    source_id_key="source",
)
print(res)

我們發(fā)現(xiàn)添加了2個文檔:

{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}

接著我們執(zhí)行:

res = index(
    [doc1, doc3],
    record_manager,
    vectorstore,
    cleanup="full",
    source_id_key="source",
)
print(res)

我們發(fā)現(xiàn)添加了一個文檔doc3,跳過了一個文檔doc1,刪除了一個文檔doc2:

線上RAG應用pdf文檔頻繁更新,老板下了死命令要節(jié)省預算,不能重復做embedding,我這么做.....-AI.x社區(qū)

{'num_added': 1, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 1}

incremental

"增量模式"是我們最常用的一種。顧名思義,這種模式主要進行增量操作,即添加最新記錄并刪除舊版記錄。在這種模式下,如果我們傳入一個空的文檔數(shù)組,即index([]),將不會發(fā)生任何操作。然而,如果我們在"全量模式"下傳入同樣的空數(shù)組,系統(tǒng)則會清除所有數(shù)據(jù)。

首先,執(zhí)行以下操作:

_clear()

res = index(
    [doc1, doc1, doc2],
    record_manager,
    vectorstore,
    cleanup="incremental",
    source_id_key="source",
)
print(res)

res = index(
    [doc1, doc3],
    record_manager,
    vectorstore,
    cleanup="incremental",
    source_id_key="source",
)
print(res)

得到的結果如下:

{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
{'num_added': 1, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 1}

可以看出,第一次操作添加了兩個文檔。在第二次操作中,系統(tǒng)跳過了doc1,并刪除了之前屬于"doggy.txt"的doc2,因為現(xiàn)在我們只傳入了doc3。因此,增量模式會將這個舊版本(doc2)刪除。

然后執(zhí)行以下操作:

res = index(
    [doc1],
    record_manager,
    vectorstore,
    cleanup="incremental",
    source_id_key="source",
)
print(res)

這次對于"doggy.txt"沒有任何新的文檔被傳入,所以數(shù)據(jù)沒有任何改動,結果如下:

{'num_added': 0, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 0}

但是,如果我們只傳入doc2,則會發(fā)現(xiàn)系統(tǒng)增加了doc2,并刪除了同一源文件("doggy.txt")的doc3。結果如下:

{'num_added': 1, 'num_updated': 0, 'num_skipped':

源碼

def index(
    docs_source: Union[BaseLoader, Iterable[Document]],
    record_manager: RecordManager,
    vector_store: VectorStore,
    *,
    batch_size: int = 100,
    cleanup: Literal["incremental", "full", None] = None,
    source_id_key: Union[str, Callable[[Document], str], None] = None,
    cleanup_batch_size: int = 1_000,
) -> IndexingResult:
    ...

    if isinstance(docs_source, BaseLoader):
        try:
            doc_iterator = docs_source.lazy_load()
        except NotImplementedError:
            doc_iterator = iter(docs_source.load())
    else:
        doc_iterator = iter(docs_source)

    source_id_assigner = _get_source_id_assigner(source_id_key)

    # Mark when the update started.
    index_start_dt = record_manager.get_time()
    num_added = 0
    num_skipped = 0
    num_updated = 0
    num_deleted = 0
    
    
    for doc_batch in _batch(batch_size, doc_iterator):
        hashed_docs = list(
            _deduplicate_in_order(
                [_HashedDocument.from_document(doc) for doc in doc_batch]
            )
        )

        source_ids: Sequence[Optional[str]] = [
            source_id_assigner(doc) for doc in hashed_docs
        ]

        ....

        exists_batch = record_manager.exists([doc.uid for doc in hashed_docs])

        # Filter out documents that already exist in the record store.
        uids = []
        docs_to_index = []
        
        # 判斷哪些是要更新,哪些是要添加的
        for hashed_doc, doc_exists in zip(hashed_docs, exists_batch):
            if doc_exists:
                # Must be updated to refresh timestamp.
                record_manager.update([hashed_doc.uid], time_at_least=index_start_dt)
                num_skipped += 1
                continue
            uids.append(hashed_doc.uid)
            docs_to_index.append(hashed_doc.to_document())

      
        # 知識入向量庫
        if docs_to_index:
            vector_store.add_documents(docs_to_index, ids=uids)
            num_added += len(docs_to_index)

        # 更新數(shù)據(jù)庫記錄時間
        record_manager.update(
            [doc.uid for doc in hashed_docs],
            group_ids=source_ids,
            time_at_least=index_start_dt,
        )

        # 根據(jù)時間和source_ids 清理舊版本數(shù)據(jù)
        if cleanup == "incremental":
            ...

            uids_to_delete = record_manager.list_keys(
                group_ids=_source_ids, before=index_start_dt
            )
            if uids_to_delete:
                vector_store.delete(uids_to_delete)
                record_manager.delete_keys(uids_to_delete)
                num_deleted += len(uids_to_delete)

    if cleanup == "full":
        while uids_to_delete := record_manager.list_keys(
            before=index_start_dt, limit=cleanup_batch_size
        ):
            # First delete from record store.
            vector_store.delete(uids_to_delete)
            # Then delete from record manager.
            record_manager.delete_keys(uids_to_delete)
            num_deleted += len(uids_to_delete)

    return {
        "num_added": num_added,
        "num_updated": num_updated,
        "num_skipped": num_skipped,
        "num_deleted": num_deleted,
    }

通過上述代碼,我們可以了解到一個常見的優(yōu)化策略:對于涉及大量數(shù)據(jù)操作的數(shù)據(jù)庫和向量庫,我們通常使用批處理(batch)方式進行操作。上面代碼的流程圖如下:

線上RAG應用pdf文檔頻繁更新,老板下了死命令要節(jié)省預算,不能重復做embedding,我這么做.....-AI.x社區(qū)

本文轉載自公眾號AI 博物院 作者:longyunfeigu

原文鏈接:??https://mp.weixin.qq.com/s/BFgtECvWMnUSDxH-JswbyQ??

?著作權歸作者所有,如需轉載,請注明出處,否則將追究法律責任
收藏
回復
舉報
回復
相關推薦