基于Agent的金融問(wèn)答系統(tǒng):RAG檢索模塊初建成 原創(chuàng)
前言
我們?cè)谏弦徽隆丁卷?xiàng)目實(shí)戰(zhàn)】基于Agent的金融問(wèn)答系統(tǒng)之項(xiàng)目簡(jiǎn)介》中簡(jiǎn)單介紹了項(xiàng)目背景以及數(shù)據(jù)集情況,本章將介紹RAG模塊的實(shí)現(xiàn)。
功能列表
參考之前所學(xué)內(nèi)容《大模型之初識(shí)RAG》,我們需要實(shí)現(xiàn)如下功能:
- 向量庫(kù)的基礎(chǔ)功能
向量庫(kù)
數(shù)據(jù)入庫(kù)
- 文件導(dǎo)入
PDF文件的讀取
PDF文件的切分
調(diào)用向量庫(kù)接口入庫(kù)
- 文件檢索
連接向量庫(kù)
檢索器檢索文件
開(kāi)發(fā)過(guò)程
1、規(guī)劃工程文件
項(xiàng)目開(kāi)始之后,我們?nèi)绻軌蛞种谱≈苯訑]代碼的沖動(dòng),改為提前做好規(guī)劃,這會(huì)是一個(gè)好的習(xí)慣。 為此,我提前做了如下規(guī)劃:
代碼管理
- 代碼使用Git進(jìn)行管理,這樣后期多人協(xié)作時(shí)方便代碼更新、代碼Merge、沖突解決等。
- 由于國(guó)內(nèi)訪問(wèn)Github優(yōu)勢(shì)會(huì)ban,所以我們將代碼倉(cāng)庫(kù)放在Gitee上。
- 為代碼倉(cāng)庫(kù)起了一個(gè)響亮的名稱后,倉(cāng)庫(kù)地址定為https://gitee.com/deadwalk/smart-finance-bot
項(xiàng)目目錄
考慮這個(gè)項(xiàng)目會(huì)涉及到前端、后端、數(shù)據(jù)集、模型等,所以項(xiàng)目目錄規(guī)劃如下:
smart-finance-bot \
|- dataset \ # 該目錄用于保存PDF以及SQLite數(shù)據(jù)
|- doc \ # 該目錄用于保存文檔類文件,例如:需求文檔、說(shuō)明文檔、數(shù)據(jù)文檔
|- app \ # 該目錄用于服務(wù)端代碼
|- agent \ # 該目錄用于保存agent相關(guān)代碼
|- rag \ # 該目錄用于保存rag相關(guān)代碼
|-test \ # 該目錄用于保存測(cè)試類驅(qū)動(dòng)相關(guān)代碼
|- conf \ # 該目錄用于保存配置文件
|-.qwen # 該文件保存QWen的配置文件(請(qǐng)自行添加對(duì)應(yīng)的API KEY)
|-.ernie # 該文件保存百度千帆的配置文件(請(qǐng)自行添加對(duì)應(yīng)的API KEY)
|- chatweb \ # 該目錄用于保存前端頁(yè)面代碼
|- scripts \ # 該目錄用于保存腳本文件,例如:?jiǎn)?dòng)腳本、導(dǎo)入向量數(shù)據(jù)庫(kù)腳本等
|- test_result \ # 該目錄用于保存測(cè)試結(jié)果
|- docker \
|- backend \ # 該目錄對(duì)應(yīng)后端python服務(wù)的Dockerfile
|- frontend \ # 該目錄對(duì)應(yīng)前端python服務(wù)的Dockerfile
上述目錄中,??dataset?
? 是直接使用git的submodul命令,直接將天池大賽提供的數(shù)據(jù)集引入到本項(xiàng)目中,方便后續(xù)使用。
引入方法:
git submodule add https://www.modelscope.cn/datasets/BJQW14B/bs_challenge_financial_14b_dataset.git dataset
命名規(guī)范
項(xiàng)目如果能夠約束統(tǒng)一的命名規(guī)范,這對(duì)于后續(xù)代碼的可讀性、可維護(hù)性會(huì)提供需要便利,在此我沿用了約定俗成的代碼命名規(guī)范:
- 類名:使用大駝峰命名法,例如:?
?MyClass?
? - 函數(shù)名:使用小駝峰命名法,例如:?
?my_function?
? - 變量名:使用小駝峰命名法,例如:?
?my_variable?
? - 文件夾:使用小駝峰命名法。
整體命名時(shí),要盡量見(jiàn)文知意。
2、實(shí)現(xiàn)基本的連接大模型的util庫(kù)
代碼文件及目錄:??app/utils/util.py?
?
from dotenv import load_dotenv
import os
# 獲取當(dāng)前文件的目錄
current_dir = os.path.dirname(__file__)
# 構(gòu)建到 conf/.qwen 的相對(duì)路徑
conf_file_path_qwen = os.path.join(current_dir,'..','conf','.qwen')
# 加載千問(wèn)環(huán)境變量
load_dotenv(dotenv_path=conf_file_path_qwen)
defget_qwen_models():
"""
加載千問(wèn)系列大模型
"""
# llm 大模型
from langchain_community.llms.tongyi importTongyi
llm =Tongyi(model="qwen-max", temperature=0.1, top_p=0.7, max_tokens=1024)
# chat 大模型
from langchain_community.chat_models importChatTongyi
chat =ChatTongyi(model="qwen-max", temperature=0.01, top_p=0.2, max_tokens=1024)
# embedding 大模型
from langchain_community.embeddings importDashScopeEmbeddings
embed =DashScopeEmbeddings(model="text-embedding-v3")
return llm, chat, embed
在app/conf/.qwen中,添加對(duì)應(yīng)的API KEY,例如:
DASHSCOPE_API_KEY = sk-xxxxxx
3、實(shí)現(xiàn)向量庫(kù)基礎(chǔ)功能
向量庫(kù)文件考慮使用Chroma實(shí)現(xiàn),所以我們先實(shí)現(xiàn)一個(gè)向量庫(kù)的類,用于完成基本的向量庫(kù)連接、數(shù)據(jù)入庫(kù)操作。
代碼文件及目錄:??app/rag/chroma_conn.py?
?
import chromadb
from chromadb importSettings
from langchain_chroma importChroma
classChromaDB:
def__init__(self,
chroma_server_type="local", # 服務(wù)器類型:http是http方式連接方式,local是本地讀取文件方式
host="localhost", port=8000, # 服務(wù)器地址,http方式必須指定
persist_path="chroma_db", # 數(shù)據(jù)庫(kù)的路徑:如果是本地連接,需要指定數(shù)據(jù)庫(kù)的路徑
collection_name="langchain", # 數(shù)據(jù)庫(kù)的集合名稱
embed=None # 數(shù)據(jù)庫(kù)的向量化函數(shù)
):
self.host = host
self.port = port
self.path = persist_path
self.embed = embed
self.store =None
# 如果是http協(xié)議方式連接數(shù)據(jù)庫(kù)
if chroma_server_type =="http":
client = chromadb.HttpClient(host=host, port=port)
self.store =Chroma(collection_name=collection_name,
embedding_function=embed,
client=client)
if chroma_server_type =="local":
self.store =Chroma(collection_name=collection_name,
embedding_function=embed,
persist_directory=persist_path)
if self.store isNone:
raiseValueError("Chroma store init failed!")
defadd_with_langchain(self, docs):
"""
將文檔添加到數(shù)據(jù)庫(kù)
"""
self.store.add_documents(documents=docs)
defget_store(self):
"""
獲得向量數(shù)據(jù)庫(kù)的對(duì)象實(shí)例
"""
return self.store
在實(shí)際項(xiàng)目實(shí)踐過(guò)程中,我們發(fā)現(xiàn)導(dǎo)入Chroma數(shù)據(jù)時(shí)使用本地化連接方式更快一些,所以對(duì)連接方式做了兩個(gè)參數(shù)的擴(kuò)展,local 代表本地連接,http 代表遠(yuǎn)程連接。
4、實(shí)現(xiàn)入庫(kù)功能
本著先跑通流程,再優(yōu)化交互過(guò)程的思路,對(duì)于PDF文件入向量庫(kù)的過(guò)程,我們先通過(guò)一段腳本實(shí)現(xiàn)(暫不做前端UI的交互)。
代碼文件及目錄:??app/rag/pdf_processor.py?
?
import os
import logging
import time
from tqdm import tqdm
from langchain_community.document_loaders importPyMuPDFLoader
from langchain_text_splitters importRecursiveCharacterTextSplitter
from rag.chroma_conn importChromaDB
classPDFProcessor:
def__init__(self,
directory, # PDF文件所在目錄
chroma_server_type, # ChromaDB服務(wù)器類型
persist_path, # ChromaDB持久化路徑
embed):# 向量化函數(shù)
self.directory = directory
self.file_group_num =80# 每組處理的文件數(shù)
self.batch_num =6# 每次插入的批次數(shù)量
self.chunksize =500# 切分文本的大小
self.overlap =100# 切分文本的重疊大小
self.chroma_db =ChromaDB(chroma_server_type=chroma_server_type,
persist_path=persist_path,
embed=embed)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
defload_pdf_files(self):
"""
加載目錄下的所有PDF文件
"""
pdf_files =[]
for file in os.listdir(self.directory):
if file.lower().endswith('.pdf'):
pdf_files.append(os.path.join(self.directory, file))
logging.info(f"Found {len(pdf_files)} PDF files.")
return pdf_files
defload_pdf_content(self, pdf_path):
"""
讀取PDF文件內(nèi)容
"""
pdf_loader =PyMuPDFLoader(file_path=pdf_path)
docs = pdf_loader.load()
logging.info(f"Loading content from {pdf_path}.")
return docs
defsplit_text(self, documents):
"""
將文本切分成小段
"""
# 切分文檔
text_splitter =RecursiveCharacterTextSplitter(
chunk_size=self.chunksize,
chunk_overlap=self.overlap,
length_function=len,
add_start_index=True,
)
docs = text_splitter.split_documents(documents)
logging.info("Split text into smaller chunks with RecursiveCharacterTextSplitter.")
return docs
definsert_docs_chromadb(self, docs, batch_size=6):
"""
將文檔插入到ChromaDB
"""
# 分批入庫(kù)
logging.info(f"Inserting {len(docs)} documents into ChromaDB.")
# 記錄開(kāi)始時(shí)間
start_time = time.time()
total_docs_inserted =0
# 計(jì)算總批次
total_batches =(len(docs)+ batch_size -1)// batch_size
with tqdm(total=total_batches, desc="Inserting batches", unit="batch")as pbar:
for i inrange(0,len(docs), batch_size):
# 獲取當(dāng)前批次的樣本
batch = docs[i:i + batch_size]
# 將樣本入庫(kù)
self.chroma_db.add_with_langchain(batch)
# self.chroma_db.async_add_with_langchain(batch)
# 更新已插入的文檔數(shù)量
total_docs_inserted +=len(batch)
# 計(jì)算并顯示當(dāng)前的TPM
elapsed_time = time.time()- start_time # 計(jì)算已用時(shí)間(秒)
if elapsed_time >0:# 防止除以零
tpm =(total_docs_inserted / elapsed_time)*60# 轉(zhuǎn)換為每分鐘插入的文檔數(shù)
pbar.set_postfix({"TPM":f"{tpm:.2f}"})# 更新進(jìn)度條的后綴信息
# 更新進(jìn)度條
pbar.update(1)
defprocess_pdfs_group(self, pdf_files_group):
# 讀取PDF文件內(nèi)容
pdf_contents =[]
for pdf_path in pdf_files_group:
# 讀取PDF文件內(nèi)容
documents = self.load_pdf_content(pdf_path)
# 將documents 逐一添加到pdf_contents
pdf_contents.extend(documents)
# 將文本切分成小段
docs = self.split_text(pdf_contents)
# 將文檔插入到ChromaDB
self.insert_docs_chromadb(docs, self.batch_num)
defprocess_pdfs(self):
# 獲取目錄下所有的PDF文件
pdf_files = self.load_pdf_files()
group_num = self.file_group_num
# group_num 個(gè)PDF文件為一組,分批處理
for i inrange(0,len(pdf_files), group_num):
pdf_files_group = pdf_files[i:i + group_num]
self.process_pdfs_group(pdf_files_group)
print("PDFs processed successfully!")
5、測(cè)試導(dǎo)入功能
因?yàn)镻ython的導(dǎo)入庫(kù)的原因(一般都是從工作目錄查找),所以我們?cè)陧?xiàng)目根目錄下創(chuàng)建test_framework.py,方便后續(xù)統(tǒng)一測(cè)試工作。
smart-finance-bot \
|- app \ # 該目錄用于服務(wù)端代碼
|- rag \ # 該目錄用于保存rag相關(guān)代碼
|- pdf_processor.py
|- chroma_conn.py
|- test_framework.py
代碼文件: ??app/test_framework.py?
?
# 測(cè)試導(dǎo)入PDF到向量庫(kù)主流程
deftest_import():
from rag.pdf_processor importPDFProcessor
from utils.util import get_qwen_models
llm , chat , embed = get_qwen_models()
# embed = get_huggingface_embeddings()
directory ="./app/dataset/pdf"
persist_path ="chroma_db"
server_type ="local"
# 創(chuàng)建 PDFProcessor 實(shí)例
pdf_processor =PDFProcessor(directory=directory,
chroma_server_type=server_type,
persist_path=persist_path,
embed=embed)
# 處理 PDF 文件
pdf_processor.process_pdfs()
if __name__ =="__main__":
test_import()
1.通過(guò)命令行啟動(dòng)ChromaDB服務(wù)端:
chroma run --path chroma_db --port 8000 --host localhost
2.運(yùn)行test_framework.py
運(yùn)行效果:
備注:一般測(cè)試框架會(huì)使用Pytest并且編寫(xiě)相應(yīng)的單元測(cè)試函數(shù),本次項(xiàng)目中由于項(xiàng)目較小且函數(shù)返回結(jié)果不固定,所以就沒(méi)有寫(xiě)UnitTest。如果想了解Pytest的使用示例,可以參考我的其他代碼倉(cāng)庫(kù),例如:UnitTest的使用
6、實(shí)現(xiàn)檢索功能
代碼文件: ??app/rag/rag.py?
?
import logging
from langchain_core.prompts importChatPromptTemplate
from langchain_core.runnables importRunnablePassthrough
from langchain_core.runnables.base importRunnableLambda
from langchain_core.output_parsers importStrOutputParser
from.chroma_conn importChromaDB
# 配置日志記錄
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')
classRagManager:
def__init__(self,
chroma_server_type="http",
host="localhost", port=8000,
persist_path="chroma_db",
llm=None, embed=None):
self.llm = llm
self.embed = embed
chrom_db =ChromaDB(chroma_server_type=chroma_server_type,
host=host, port=port,
persist_path=persist_path,
embed=embed)
self.store = chrom_db.get_store()
defget_chain(self, retriever):
"""獲取RAG查詢鏈"""
# RAG系統(tǒng)經(jīng)典的 Prompt (A 增強(qiáng)的過(guò)程)
prompt =ChatPromptTemplate.from_messages([
("human","""You are an assistant for question-answering tasks. Use the following pieces
of retrieved context to answer the question.
If you don't know the answer, just say that you don't know.
Use three sentences maximum and keep the answer concise.
Question: {question}
Context: {context}
Answer:""")
])
# 將 format_docs 方法包裝為 Runnable
format_docs_runnable =RunnableLambda(self.format_docs)
# RAG 鏈
rag_chain =(
{"context": retriever | format_docs_runnable,
"question":RunnablePassthrough()}
| prompt
| self.llm
|StrOutputParser()
)
return rag_chain
defformat_docs(self, docs):
# 返回檢索到的資料文件名稱
logging.info(f"檢索到資料文件個(gè)數(shù):{len(docs)}")
retrieved_files ="\n".join([doc.metadata["source"]for doc in docs])
logging.info(f"資料文件分別是:\n{retrieved_files}")
retrieved_content ="\n\n".join(doc.page_content for doc in docs)
logging.info(f"檢索到的資料為:\n{retrieved_content}")
return retrieved_content
defget_retriever(self, k=4, mutuality=0.3):
retriever = self.store.as_retriever(search_type="similarity_score_threshold",
search_kwargs={"k": k,"score_threshold": mutuality})
return retriever
defget_result(self, question, k=4, mutuality=0.3):
"""獲取RAG查詢結(jié)果"""
retriever = self.get_retriever(k, mutuality)
rag_chain = self.get_chain(retriever)
return rag_chain.invoke(input=question)
以上是實(shí)現(xiàn)了一個(gè)使用基本檢索器的RAG,其中:
- 代碼中通過(guò)chroma_conn.py模塊連接到ChromaDB數(shù)據(jù)庫(kù),并使用ChromaDB的as_retriever方法創(chuàng)建一個(gè)檢索器。
7、測(cè)試檢索效果
在test_framework.py中添加RAG的測(cè)試調(diào)用函數(shù)。
代碼文件:??app/test_framework.py?
?
# 測(cè)試RAG主流程
deftest_rag():
from rag.rag importRagManager
from utils.util import get_qwen_models
llm, chat, embed = get_qwen_models()
rag =RagManager(host="localhost", port=8000, llm=llm, embed=embed)
result = rag.get_result("湖南長(zhǎng)遠(yuǎn)鋰科股份有限公司變更設(shè)立時(shí)作為發(fā)起人的法人有哪些?")
print(result)
if __name__ =="__main__":
test_rag()# RAG測(cè)試函數(shù)
# test_import() # 批量導(dǎo)入PDF測(cè)試函數(shù)
注釋掉批量導(dǎo)入函數(shù),開(kāi)啟test_rag()函數(shù),運(yùn)行效果:
至此,我們完成了RAG模塊的基本功能,它包括PDF文件的批量導(dǎo)入以及檢索功能。
內(nèi)容小結(jié)
- 首先,我們創(chuàng)建了一個(gè)ChromaDB的類,封裝了基礎(chǔ)的Chroma連接、插入、檢索。
- 其次,我們實(shí)現(xiàn)了PDFProcessor類,該類中會(huì)調(diào)用ChromaDB類的插入函數(shù),將批量讀取的PDF文件進(jìn)行切分后保存至向量庫(kù)。
- 然后,我們實(shí)現(xiàn)了RagManager類,該類中封裝了RAG的檢索鏈,并定義了檢索的參數(shù)。
- 最后,我們實(shí)現(xiàn)了一個(gè)測(cè)試函數(shù),用于測(cè)試RAG的檢索功能。
- 除此之外,有兩個(gè)注意事項(xiàng):
在項(xiàng)目初期,進(jìn)行合理的項(xiàng)目文件目錄規(guī)劃,可以有效減少項(xiàng)目維護(hù)的難度。
在項(xiàng)目行進(jìn)中,通過(guò)搭建測(cè)試框架,可以方便函數(shù)驗(yàn)證以及后續(xù)重構(gòu)的回歸測(cè)試。
本文轉(zhuǎn)載自公眾號(hào)一起AI技術(shù) 作者:Dongming
原文鏈接:??https://mp.weixin.qq.com/s/ZMwZSsmms03U-GRRwnxMNg??
