線上RAG應用pdf文檔頻繁更新,老板下了死命令要節(jié)省預算,不能重復做embedding,我這么做..... 原創(chuàng)
我們最近在一個項目中遇到了一個問題。項目的場景是這樣的:用戶將他們的PDF文檔存儲在磁盤的某個特定目錄中,然后有一個定時任務來掃描此目錄并從中的PDF文檔構建知識庫。
一開始,我們采用"增量更新"策略。在掃描目錄中的文檔時,我們會對每個文檔進行哈希運算以生成其指紋,并檢查該指紋是否已存在于數(shù)據(jù)庫中。如果指紋不存在,就表示這是一個新文件,我們會對新文件的document做embedding,然后將其加入到知識庫中。
然而,這種方法存在一個問題。如果同一文件進行了增量添加,例如我們已經將A.pdf文件加入到了知識庫,但后來這個文件添加了新的內容。當我們重新計算其指紋并在數(shù)據(jù)庫中查找時,由于指紋不存在,我們會將這個更新過的文件作為新文件處理,并重新做embedding加入到知識庫。這樣一來,對于未更新的部分,知識庫會有兩份相同的數(shù)據(jù)記錄,第二份相同的記錄可能會"占據(jù)"原本應該被召回的數(shù)據(jù)記錄的位置,從而降低問答效果。
那么應該怎么解決這個問題呢?對于增量更新,做hash指紋這一點毋庸置疑,但是hash的對象不能是文件了,而應該聚焦于真實存到知識庫的數(shù)據(jù): document.
在這里,我們將查看使用LangChain index API的基本索引工作流。
index API允許您將來自任何源的文檔加載到矢量存儲中并保持同步。具體來說,它有助于:
- 避免將重復的內容寫入vector存儲
- 避免重寫未更改的內容
- 避免在未更改的內容上重新計算embedding
所有這些都可以節(jié)省你的時間和金錢,并改善你的矢量搜索結果。
如何工作
LangChain索引使用記錄管理器(RecordManager)來跟蹤寫入矢量存儲的文檔。
當索引內容時,為每個文檔計算哈希值,并將以下信息存儲在記錄管理器中:
- 文檔hash(頁面內容和元數(shù)據(jù)的散列)
- 寫時間
- 源id——每個文檔應該在其元數(shù)據(jù)中包含信息,以便我們確定該文檔的最終來源
刪除模式
將文檔索引到矢量存儲時,可能會刪除矢量存儲中的一些現(xiàn)有文檔。在某些情況下,您可能希望刪除與正在索引的新文檔來自相同來源的所有現(xiàn)有文檔。在其他情況下,您可能希望批量刪除所有現(xiàn)有文檔。索引API刪除模式可以讓你選擇你想要的行為:
Cleanup Mode | De-Duplicates Content | Parallelizable | Cleans Up Deleted Source Docs | Cleans Up Mutations of Source Docs and/or Derived Docs | Clean Up Timing |
None | ? | ? | ? | ? | - |
Incremental | ? | ? | ? | ? | Continuously |
Full | ? | ? | ? | ? | At end of indexing |
快速開始
首先,需要明確的是,無論使用何種清理模式,index函數(shù)都會自動去重。也就是說,調用index([doc1, doc1, doc2])的效果等同于調用index([doc1, doc2])。然而,在我們的實際應用場景中,情況并不完全如此。
可能在第一次運行時,我們對[doc1, doc2]進行了索引操作,而在下次定時任務執(zhí)行時,我們又對[doc1, doc3]進行了索引。換言之,我們從源文檔中刪除了一部分內容,并添加了一些新的內容。這才是我們真正面臨的場景:我們希望保持doc1不變,新增doc3,并能夠自動刪除doc2。這種需求可以通過Incremental增量模式得到滿足。
話不多說,我們來看看三種模式的使用效果吧。
None
None模式的功能可以理解為去重和添加,而不包括刪除。例如,如果你首次調用index([doc1, doc2]),然后再次調用index([doc1, doc3]),那么在向量庫中的數(shù)據(jù)就會是[doc1, doc2, doc3]。需要注意的是,這種模式下,舊版本的doc2并不會被刪除。
from langchain.embeddings import OpenAIEmbeddings
from langchain.schema import Document
from langchain.vectorstores.elasticsearch import ElasticsearchStore
from langchain.indexes import SQLRecordManager, index
collection_name = "test_index"
embedding = OpenAIEmbeddings()
vectorstore = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
embedding=embedding)
namespace = f"elasticsearch/{collection_name}"
record_manager = SQLRecordManager(
namespace, db_url="sqlite:///record_manager_cache.sql"
)
# record_manager.create_schema()
doc1 = Document(page_content="kitty", metadata={"source": "kitty.txt"})
doc2 = Document(page_content="doggy", metadata={"source": "doggy.txt"})
doc3 = Document(page_content="doggy1", metadata={"source": "doggy.txt"})
def _clear():
"""Hacky helper method to clear content. See the `full` mode section to to understand why it works."""
index(
[],
record_manager,
vectorstore,
cleanup="full",
source_id_key="source")
_clear()
res = index(
[doc1, doc1, doc2],
record_manager,
vectorstore,
cleanup=None,
source_id_key="source",
)
print(res)
得到的結果:
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
我們發(fā)現(xiàn)做了去重并且?guī)臀覀冊黾恿藘蓷l數(shù)據(jù)。
然后我們再執(zhí)行index操作:
res = index(
[doc1, doc3],
record_manager,
vectorstore,
cleanup=None,
source_id_key="source",
)
print(res)
執(zhí)行結果發(fā)現(xiàn)添加了doc3, 跳過了doc1, doc2 還在數(shù)據(jù)庫記錄里:
{'num_added': 1, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 0}
full
full 含義是用戶應該將所有需要進行索引的全部內容傳遞給index函數(shù),任何沒有傳遞到索引函數(shù)并且存在于vectorstore中的文檔將被刪除! 此行為對于處理源文檔的刪除非常有用。我們還是使用上面的代碼,這次只是把模式換成 full. 首先,我們需要重置并清空數(shù)據(jù),這可以通過調用??_clear()?
?函數(shù)實現(xiàn)。
res = index(
[doc1, doc1, doc2],
record_manager,
vectorstore,
cleanup="full",
source_id_key="source",
)
print(res)
我們發(fā)現(xiàn)添加了2個文檔:
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
接著我們執(zhí)行:
res = index(
[doc1, doc3],
record_manager,
vectorstore,
cleanup="full",
source_id_key="source",
)
print(res)
我們發(fā)現(xiàn)添加了一個文檔doc3,跳過了一個文檔doc1,刪除了一個文檔doc2:
{'num_added': 1, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 1}
incremental
"增量模式"是我們最常用的一種。顧名思義,這種模式主要進行增量操作,即添加最新記錄并刪除舊版記錄。在這種模式下,如果我們傳入一個空的文檔數(shù)組,即index([]),將不會發(fā)生任何操作。然而,如果我們在"全量模式"下傳入同樣的空數(shù)組,系統(tǒng)則會清除所有數(shù)據(jù)。
首先,執(zhí)行以下操作:
_clear()
res = index(
[doc1, doc1, doc2],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
print(res)
res = index(
[doc1, doc3],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
print(res)
得到的結果如下:
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
{'num_added': 1, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 1}
可以看出,第一次操作添加了兩個文檔。在第二次操作中,系統(tǒng)跳過了doc1,并刪除了之前屬于"doggy.txt"的doc2,因為現(xiàn)在我們只傳入了doc3。因此,增量模式會將這個舊版本(doc2)刪除。
然后執(zhí)行以下操作:
res = index(
[doc1],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
print(res)
這次對于"doggy.txt"沒有任何新的文檔被傳入,所以數(shù)據(jù)沒有任何改動,結果如下:
{'num_added': 0, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 0}
但是,如果我們只傳入doc2,則會發(fā)現(xiàn)系統(tǒng)增加了doc2,并刪除了同一源文件("doggy.txt")的doc3。結果如下:
{'num_added': 1, 'num_updated': 0, 'num_skipped':
源碼
def index(
docs_source: Union[BaseLoader, Iterable[Document]],
record_manager: RecordManager,
vector_store: VectorStore,
*,
batch_size: int = 100,
cleanup: Literal["incremental", "full", None] = None,
source_id_key: Union[str, Callable[[Document], str], None] = None,
cleanup_batch_size: int = 1_000,
) -> IndexingResult:
...
if isinstance(docs_source, BaseLoader):
try:
doc_iterator = docs_source.lazy_load()
except NotImplementedError:
doc_iterator = iter(docs_source.load())
else:
doc_iterator = iter(docs_source)
source_id_assigner = _get_source_id_assigner(source_id_key)
# Mark when the update started.
index_start_dt = record_manager.get_time()
num_added = 0
num_skipped = 0
num_updated = 0
num_deleted = 0
for doc_batch in _batch(batch_size, doc_iterator):
hashed_docs = list(
_deduplicate_in_order(
[_HashedDocument.from_document(doc) for doc in doc_batch]
)
)
source_ids: Sequence[Optional[str]] = [
source_id_assigner(doc) for doc in hashed_docs
]
....
exists_batch = record_manager.exists([doc.uid for doc in hashed_docs])
# Filter out documents that already exist in the record store.
uids = []
docs_to_index = []
# 判斷哪些是要更新,哪些是要添加的
for hashed_doc, doc_exists in zip(hashed_docs, exists_batch):
if doc_exists:
# Must be updated to refresh timestamp.
record_manager.update([hashed_doc.uid], time_at_least=index_start_dt)
num_skipped += 1
continue
uids.append(hashed_doc.uid)
docs_to_index.append(hashed_doc.to_document())
# 知識入向量庫
if docs_to_index:
vector_store.add_documents(docs_to_index, ids=uids)
num_added += len(docs_to_index)
# 更新數(shù)據(jù)庫記錄時間
record_manager.update(
[doc.uid for doc in hashed_docs],
group_ids=source_ids,
time_at_least=index_start_dt,
)
# 根據(jù)時間和source_ids 清理舊版本數(shù)據(jù)
if cleanup == "incremental":
...
uids_to_delete = record_manager.list_keys(
group_ids=_source_ids, before=index_start_dt
)
if uids_to_delete:
vector_store.delete(uids_to_delete)
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
if cleanup == "full":
while uids_to_delete := record_manager.list_keys(
before=index_start_dt, limit=cleanup_batch_size
):
# First delete from record store.
vector_store.delete(uids_to_delete)
# Then delete from record manager.
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
return {
"num_added": num_added,
"num_updated": num_updated,
"num_skipped": num_skipped,
"num_deleted": num_deleted,
}
通過上述代碼,我們可以了解到一個常見的優(yōu)化策略:對于涉及大量數(shù)據(jù)操作的數(shù)據(jù)庫和向量庫,我們通常使用批處理(batch)方式進行操作。上面代碼的流程圖如下:
本文轉載自公眾號AI 博物院 作者:longyunfeigu
