基于Agent的金融問(wèn)答系統(tǒng):RAG的檢索增強(qiáng)之ElasticSearch 原創(chuàng)
前言
本章內(nèi)容,我們將在已經(jīng)構(gòu)建的agent框架基礎(chǔ)上,優(yōu)化檢索器,為檢索器搭建ElasticSearch服務(wù),實(shí)現(xiàn)問(wèn)答系統(tǒng)的檢索增強(qiáng)。
檢索問(wèn)題
通過(guò)測(cè)試天池大賽數(shù)據(jù)集的前100個(gè)問(wèn)題,我們發(fā)現(xiàn)有很多問(wèn)題RAG檢索不到,例如:
? {"id": 34, "question": "根據(jù)武漢興圖新科電子股份有限公司招股意向書(shū),電子信息行業(yè)的上游涉及哪些企業(yè)?"}
通過(guò)查看日志,檢索器沒(méi)有檢索到相關(guān)信息:
優(yōu)化方案
分析上述case原因,檢索器太過(guò)簡(jiǎn)單所致。
class SimpleRetrieverWrapper():
"""自定義檢索器實(shí)現(xiàn)"""
def__init__(self, store, llm, **kwargs):
self.store = store
self.llm = llm
logger.info(f'檢索器所使用的Chat模型:{self.llm}')
defcreate_retriever(self):
logger.info(f'初始化自定義的Retriever')
chromadb_retriever = self.store.as_retriever()
return chromadb_retriever
基于以上問(wèn)題,我們計(jì)劃使用集成檢索器,方案如下:
說(shuō)明:
? 將檢索器改為使用??EnsembleRetriever?
?
? 集成檢索器其中之一使用??ElasticSearch?
?? 檢索器,這個(gè)檢索器通過(guò)連接??ElasticSearch?
? 服務(wù),通過(guò)關(guān)鍵字查詢(xún)相關(guān)信息。
? 集成檢索器另外一個(gè)使用??MultiQueryRetriever?
? 檢索器,這個(gè)檢索器通過(guò)連接Chroma向量庫(kù)查詢(xún)信息。
關(guān)于MultiQueryRetriever和ElasticSearch,之前有文章做過(guò)基本內(nèi)容的總結(jié),詳情請(qǐng)查看課程總結(jié)】day29:大模型之深入了解Retrievers解析器。
優(yōu)化步驟
1、搭建ES服務(wù)
第一步:安裝Docker,該內(nèi)容不再贅述,具體請(qǐng)見(jiàn)10分鐘學(xué)會(huì)Docker的安裝和使用
第二步:創(chuàng)建網(wǎng)絡(luò)
docker network create es-net
第三步:拉取鏡像
docker pull elasticsearch:8.6.0
第四步:創(chuàng)建掛載點(diǎn)目錄
smart-finance-bot \
|- app \
|- docker \
|- elasticsearch \ # 創(chuàng)建elasticsearch掛載目錄
|- data \ # 創(chuàng)建數(shù)據(jù)目錄
|- config \ # 創(chuàng)建配置目錄
|- plugins \ # 創(chuàng)建插件目錄
第五步:命令行中輸入命令啟動(dòng)Docker容器
docker run -d \
--restart=always \
--name es \
--network es-net \
-p 9200:9200 \
-p 9300:9300 \
--privileged \
-v /Users/deadwalk/Code/smart-finance-bot/docker/elasticsearch/data:/usr/share/elasticsearch/data \
-v /Users/deadwalk/Code/smart-finance-bot/docker/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-e "discovery.type=single-node" \
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
elasticsearch:8.6.0
注意:
? 上述的??/Users/deadwalk/Code/smart-finance-bot?
? 請(qǐng)根據(jù)本地路徑修改;
? 運(yùn)行完畢后請(qǐng)使用??docker ps?
?確認(rèn)容器已經(jīng)啟動(dòng)。
第六步:進(jìn)入es容器
docker exec -it es /bin/bash
第七步:命令行輸入重置密碼命令(此處我們重置密碼為123abc)
bin/elasticsearch-reset-password -i -u elastic
第八步:使用瀏覽器訪問(wèn)http://localhost:9200/,驗(yàn)證服務(wù)可以使用
2、添加數(shù)據(jù)到ES服務(wù)
2.1、測(cè)試ES的連接
編寫(xiě)ES連接測(cè)試代碼,驗(yàn)證ES服務(wù)連接。
def test_es_connect():
from elasticsearch importElasticsearch
ELASTIC_PASSWORD ="123abc"
host ="localhost"
port =9200
schema ="https"
url =f"{schema}://elastic:{ELASTIC_PASSWORD}@{host}:{port}"
client =Elasticsearch(
url,
verify_certs=False,
)
print(client.info())
運(yùn)行結(jié)果:
2.2、實(shí)現(xiàn)ElasticSearch連接代碼
代碼文件:??app/rag/elasticsearch_db.py?
?
# 引入
from langchain_core.retrievers importBaseRetriever
from langchain_core.documents importDocument
# ES需要導(dǎo)入的庫(kù)
from typing importList
import re
import jieba
import nltk
from nltk.corpus import stopwords
import time
from elasticsearch importElasticsearch
from elasticsearch.exceptions importConnectionError,AuthenticationException
from elasticsearch import helpers
import settings
from utils.logger_config importLoggerManager
from utils.util_nltk importUtilNltk
import os
import warnings
warnings.simplefilter("ignore")# 屏蔽 ES 的一些Warnings
utilnltk =UtilNltk()
logger =LoggerManager().logger
classTraditionDB:
defadd_documents(self, docs):
"""
將文檔添加到數(shù)據(jù)庫(kù)
"""
raiseNotImplementedError("Subclasses should implement this method!")
defget_store(self):
"""
獲得向量數(shù)據(jù)庫(kù)的對(duì)象實(shí)例
"""
raiseNotImplementedError("Subclasses should implement this method!")
classElasticsearchDB(TraditionDB):
def__init__(self,
schema=settings.ELASTIC_SCHEMA,
host=settings.ELASTIC_HOST,
port=settings.ELASTIC_PORT,
index_name=settings.ELASTIC_INDEX_NAME,
k=3
# docs=docs
):
# 定義索引名稱(chēng)
self.index_name = index_name
self.k = k
try:
url =f"{schema}://elastic:{settings.ELASTIC_PASSWORD}@{host}:{port}"
logger.info(f'初始化ES服務(wù)連接:{url}')
self.es =Elasticsearch(
url,
verify_certs=False,
# ca_certs="./docker/elasticsearch/certs/ca/ca.crt",
# basic_auth=("elastic", settings.ELASTIC_PASSWORD)
)
response = self.es.info()# 嘗試獲取信息
logger.info(f'ES服務(wù)響應(yīng): {response}')
except(ConnectionError,AuthenticationException)as e:
logger.error(f'連接 Elasticsearch 失敗: {e}')
raise
exceptExceptionas e:
logger.error(f'發(fā)生其他錯(cuò)誤: {e}')
logger.error(f'異常類(lèi)型: {type(e).__name__}')# 記錄異常類(lèi)型
raise
defto_keywords(self, input_string):
"""將句子轉(zhuǎn)成檢索關(guān)鍵詞序列"""
# 按搜索引擎模式分詞
word_tokens = jieba.cut_for_search(input_string)
# 加載停用詞表
stop_words =set(stopwords.words('chinese'))
# 去除停用詞
filtered_sentence =[w for w in word_tokens ifnot w in stop_words]
return' '.join(filtered_sentence)
defsent_tokenize(self, input_string):
"""按標(biāo)點(diǎn)斷句,沒(méi)有用到"""
# 按標(biāo)點(diǎn)切分
sentences = re.split(r'(?<=[。?。?;?!])', input_string)
# 去掉空字符串
return[sentence for sentence in sentences if sentence.strip()]
defcreate_index(self):
"""如果索引不存在,則創(chuàng)建索引"""
ifnot self.es.indices.exists(index=self.index_name):
# 創(chuàng)建索引
self.es.indices.create(index=self.index_name, ignore=400)
defbluk_data(self, paragraphs):
"""批量進(jìn)行數(shù)據(jù)灌庫(kù)"""
# 灌庫(kù)指令
actions =[
{
"_index": self.index_name,
"_source":{
"keywords": self.to_keywords(para.page_content),
"text": para.page_content
}
}
for para in paragraphs
]
# 文本灌庫(kù)
helpers.bulk(self.es, actions)
# # 灌庫(kù)是異步的
# time.sleep(2)
defflush(self):
# 刷新數(shù)據(jù),數(shù)據(jù)入庫(kù)完成以后刷新數(shù)據(jù)
self.es.indices.flush()
defsearch(self, query_string):
"""關(guān)鍵詞檢索"""
# ES 的查詢(xún)語(yǔ)言
search_query ={
"match":{
"keywords": self.to_keywords(query_string)
}
}
res = self.es.search(index=self.index_name, query=search_query, size=self.k)
return[hit["_source"]["text"]for hit in res["hits"]["hits"]]
defdelete(self):
"""如果索引存在,則刪除索引"""
if self.es.indices.exists(index=self.index_name):
# 創(chuàng)建索引
self.es.indices.delete(index=self.index_name, ignore=400)
defadd_documents(self, docs):
self.bluk_data(docs)
self.flush()
說(shuō)明:
? elasticsearch后續(xù)的插入操作中,使用到了nltk分詞,其代碼已經(jīng)封裝在UtilNltk類(lèi)中,具體代碼請(qǐng)查看Github倉(cāng)庫(kù)代碼,本文不再贅述。
???LoggerManager?
?是代碼重構(gòu)時(shí),封裝的一個(gè)日志管理類(lèi),具體代碼請(qǐng)查看Github倉(cāng)庫(kù)代碼,本文不再贅述。
2.3、修改PDF文件導(dǎo)入代碼
在settings.py中添加elasticsearch配置信息:
"""
ES數(shù)據(jù)庫(kù)相關(guān)的配置
"""
# ES服務(wù)開(kāi)關(guān):True表示開(kāi)啟ES服務(wù),F(xiàn)alse表示關(guān)閉ES服務(wù)
ELASTIC_ENABLE_ES = True
ELASTIC_PASSWORD = os.getenv("ELASTIC_PASSWORD", "123abc")
ELASTIC_HOST = os.getenv("ELASTIC_HOST", "localhost")
ELASTIC_PORT = os.getenv("ELASTIC_PORT", 9200)
ELASTIC_SCHEMA = "https"
ELASTIC_INDEX_NAME = "smart_test_index"
確認(rèn)PDFProcessor.py中已經(jīng)添加了對(duì)于Elasticsearch的插入操作支持,具體代碼在【項(xiàng)目實(shí)戰(zhàn)】基于Agent的金融問(wèn)答系統(tǒng):代碼重構(gòu)已做介紹,所以本文不再贅述。
2.4、測(cè)試PDF文件導(dǎo)入代碼
在test_framework.py中添加如下代碼
def test_import_elasticsearch():
# from rag.elasticsearch_db import TraditionDB
from rag.elasticsearch_db importElasticsearchDB
from rag.pdf_processor importPDFProcessor
llm, chat, embed = settings.LLM, settings.CHAT, settings.EMBED
# 導(dǎo)入文件的文件目錄
directory ="./dataset/pdf"
# 創(chuàng)建 Elasticsearch 數(shù)據(jù)庫(kù)實(shí)例
es_db =ElasticsearchDB()
# 創(chuàng)建 PDFProcessor 實(shí)例
pdf_processor =PDFProcessor(directory=directory,
db_type="es",
es_client=es_db,
embed=embed)
# 處理 PDF 文件
pdf_processor.process_pdfs()
運(yùn)行結(jié)果:
3、修改檢索器增加Elasticsearch檢索
代碼文件:??app/rag/retrievers.py?
?
from langchain_core.callbacks importCallbackManagerForRetrieverRun
from utils.logger_config importLoggerManager
from langchain_core.retrievers importBaseRetriever
from langchain_core.documents importDocument
from langchain.retrievers importEnsembleRetriever
from langchain.retrievers.multi_query importMultiQueryRetriever
from rag.elasticsearch_db importElasticsearchDB
# ES需要導(dǎo)入的庫(kù)
from typing importList
import logging
import settings
logger =LoggerManager().logger
classSimpleRetrieverWrapper():
"""自定義檢索器實(shí)現(xiàn)"""
def__init__(self, store, llm, **kwargs):
self.store = store
self.llm = llm
logger.info(f'檢索器所使用的Chat模型:{self.llm}')
defcreate_retriever(self):
logger.info(f'初始化自定義的Retriever')
# 初始化一個(gè)空的檢索器列表
retrievers =[]
weights =[]
# Step1:創(chuàng)建一個(gè) 多路召回檢索器 MultiQueryRetriever
chromadb_retriever = self.store.as_retriever()
mq_retriever =MultiQueryRetriever.from_llm(retriever=chromadb_retriever, llm=self.llm)
retrievers.append(mq_retriever)
weights.append(0.5)
logger.info(f'已啟用 MultiQueryRetriever')
# Step2:創(chuàng)建一個(gè) ES 檢索器
if settings.ELASTIC_ENABLE_ES isTrue:
es_retriever =ElasticsearchRetriever()
retrievers.append(es_retriever)
weights.append(0.5)
logger.info(f'已啟用 ElasticsearchRetriever')
# 使用集成檢索器,將所有啟用的檢索器集合在一起
ensemble_retriever =EnsembleRetriever(retrievers=retrievers, weights=weights)
return ensemble_retriever
classElasticsearchRetriever(BaseRetriever):
def_get_relevant_documents(self, query: str, )->List[Document]:
"""Return the first k documents from the list of documents"""
es_connector =ElasticsearchDB()
query_result = es_connector.search(query)
logger.info(f"ElasticSearch檢索到資料文件個(gè)數(shù):{len(query_result)}")
if query_result:
return[Document(page_content=doc)for doc in query_result]
return[]
asyncdef_aget_relevant_documents(self, query: str)->List[Document]:
"""(Optional) async native implementation."""
es_connector =ElasticsearchDB()
query_result = es_connector.search(query)
if query_result:
return[Document(page_content=doc)for doc in query_result]
return []
4、測(cè)試驗(yàn)證
在test_framework.py中運(yùn)行test_financebot_ex()函數(shù),測(cè)試檢索功能。
def test_financebot_ex():
from finance_bot_ex import FinanceBotEx
# 使用Chroma 的向量庫(kù)
financebot = FinanceBotEx()
example_query = "根據(jù)武漢興圖新科電子股份有限公司招股意向書(shū),電子信息行業(yè)的上游涉及哪些企業(yè)?"
financebot.handle_query(example_query)
運(yùn)行結(jié)果: 連接ES后檢索到3個(gè)資料文件
使用多路召回,生成3個(gè)檢索問(wèn)題
最終通過(guò)集成檢索器檢索到答案
優(yōu)化效果
通過(guò)對(duì)天池大賽前100個(gè)問(wèn)題的對(duì)比測(cè)試,我們最終得到如下對(duì)比驗(yàn)證結(jié)果:
內(nèi)容小結(jié)
- 集成檢索器:
可以有效提高檢索的效率,同時(shí)可以增加檢索的準(zhǔn)確度。
可以添加多個(gè)檢索器并配置不同的權(quán)重,以實(shí)現(xiàn)靈活的組合。
- Elasticsearch
作為傳統(tǒng)搜索引擎,可以通過(guò)keyword_search檢索到相關(guān)內(nèi)容。
使用時(shí)需要使用Docker搭建ES服務(wù)。
數(shù)據(jù)文件需要添加到ES服務(wù)中,方便檢索。
- MultiQueryRetriever
多路召回,將問(wèn)題拆分成多個(gè)問(wèn)題,然后進(jìn)行檢索,最終合并結(jié)果。
本文轉(zhuǎn)載自公眾號(hào)一起AI技術(shù) 作者:Dongming
