譯者 | 朱先忠
審校 | 重樓
LangChain框架是一個(gè)非常強(qiáng)大的工具,它大大加快了LLM在項(xiàng)目和代理開發(fā)中的有效使用。該框架提供了一種高級(jí)抽象,允許開發(fā)人員立即開始使用模型并將其集成到他們的產(chǎn)品中。但是另一方面,了解LangChain的核心概念(例如Runnable的架構(gòu))對(duì)于構(gòu)建LLM代理和鏈的開發(fā)人員非常有益,因?yàn)樗峁┝艘环N結(jié)構(gòu)化的方法和對(duì)使用框架的洞察力。
LangChain架構(gòu)基礎(chǔ)
LangChain中的Runnable架構(gòu)基于命令模式的原理構(gòu)建,命令模式是一種將請(qǐng)求封裝為對(duì)象的行為設(shè)計(jì)模式。這種設(shè)計(jì)有助于參數(shù)化、排隊(duì)和動(dòng)態(tài)執(zhí)行命令,使Runnable對(duì)象在各種工作流中模塊化、可組合和可管理。
Runnable對(duì)象特別適合工作流管理、順序任務(wù)執(zhí)行、處理?xiàng)l件邏輯以及與外部系統(tǒng)交互,它們提供靈活性、可重用性和模塊化。你可以動(dòng)態(tài)地將任務(wù)鏈接在一起以創(chuàng)建復(fù)雜的行為場(chǎng)景,同時(shí)保持干凈且易于管理的代碼結(jié)構(gòu)。
Runnable鏈的可能配置之一
LangChain中執(zhí)行特定任務(wù)的大多數(shù)高級(jí)對(duì)象都實(shí)現(xiàn)了Runnable類。你計(jì)劃包含在鏈中的任何對(duì)象也必須以某種方式實(shí)現(xiàn)Runnable類。有趣的是,Runnable充當(dāng)命令的抽象或一個(gè)具體命令,同時(shí)也充當(dāng)了一個(gè)調(diào)用者和接收者角色。
一個(gè)值得注意的例子是此類中提供的管道方法,它專門用于創(chuàng)建鏈。此方法允許無縫組合多個(gè)Runnable,使其成為在LangChain中構(gòu)建和執(zhí)行工作流的基石。
在上圖中,你可以看到Runnable如何與其各種實(shí)現(xiàn)一起運(yùn)行。接下來,我們將在本文中詳細(xì)研究這些實(shí)現(xiàn)。
創(chuàng)建Runnable
實(shí)際上,有兩種方法可以創(chuàng)建Runnable:通過RunnableLambda或擴(kuò)展基Runnable類。
將RunnableLambda用于簡(jiǎn)單函數(shù)
創(chuàng)建Runnable的最簡(jiǎn)單方法是使用RunnableLambda。此類允許你將任何函數(shù)包裝為Runnable,從而允許動(dòng)態(tài)行為而無需自定義類。
import { RunnableLambda } from "@langchain/core/runnables";
//定義一個(gè)簡(jiǎn)單函數(shù)
const toUpperCase = (text: string): string => text.toUpperCase();
//將函數(shù)包裝為Runnable
const upperCaseRunnable = RunnableLambda.from(toUpperCase);
//調(diào)用Runnable
const result = await upperCaseRunnable.invoke("hello world");
//輸出: "HELLO WORLD"
擴(kuò)展Runnable基類
對(duì)于更高級(jí)的使用場(chǎng)景,你可以擴(kuò)展Runnable基類。此方法可完全控制執(zhí)行生命周期,包括調(diào)用、批處理和流等方法。
import { Runnable } from "@langchain/core/runnables";
class GreetUserRunnable extends Runnable<string, string> {
lc_namespace = ["GreetUser"];
onStart(data: { input: string }) {
console.log(`Starting with input: ${data.input}`);
}
onEnd(data: { result: string }) {
console.log(`Finished with result: ${data.result}`);
}
onError(error: unknown) {
console.error(`Error occurred: ${(error as Error).message}`);
}
//自定義執(zhí)行邏輯
async invoke(name: string): Promise<string> {
this.onStart({ input: name });
try {
const greeting = `Hello, ${name}!`;
this.onEnd({ result: greeting });
return greeting;
} catch (error) {
this.onError(error);
throw error;
}
}
}
使用Runnable構(gòu)建工作流
LangChain中的Runnable架構(gòu)通過按功能分組的專用Runnable進(jìn)行了擴(kuò)展,使其用途廣泛且適用于各種應(yīng)用程序。
路由和分支類型
根據(jù)條件或輸入管理執(zhí)行流程的Runnable有:
RouterRunnable
根據(jù)給定的鍵值將輸入定向到特定的Runnable,類似于switch-case語句。適用于基于運(yùn)行時(shí)參數(shù)的動(dòng)態(tài)任務(wù)執(zhí)行。
import { RouterRunnable, RunnableLambda } from "@langchain/core/runnables";
const router = new RouterRunnable({
runnables: {
billing: RunnableLambda.from((query: string) => `Billing Department: ${query}`),
technical: RunnableLambda.from((query: string) => `Technical Support: ${query}`),
general: RunnableLambda.from((query: string) => `General Inquiry: ${query}`),
},
});
//路由一個(gè)賬單問題
const result1 = await router.invoke({ key: "billing", input: "I have a question about my invoice." });
// 輸出: "Billing Department: I have a question about my invoice."
//路由一個(gè)技術(shù)問題
const result2 = await router.invoke({ key: "technical", input: "My internet is not working." });
// 輸出: "Technical Support: My internet is not working."
RunnableBranch
根據(jù)條件檢查從多個(gè)選項(xiàng)中執(zhí)行特定的Runnable,使工作流能夠適應(yīng)不同的輸入場(chǎng)景。
const branch = RunnableBranch.from([
[
(user: { age: number }) => user.age < 18,
RunnableLambda.from((user) => `Hey ${user.name}, check out our new teen collection!`),
],
[
(user: { age: number }) => user.age >= 18 && user.age < 30,
RunnableLambda.from((user) => `Hi ${user.name}, explore our trendy outfits for young adults!`),
],
RunnableLambda.from((user) => `Hello ${user.name}, discover our premium range!`),
]);
const result = await branch.invoke({ name: "Alice", age: 25 });
// 輸出: "Hi Alice, explore our trendy outfits for young adults!"
數(shù)據(jù)操作和分配類型
轉(zhuǎn)換或準(zhǔn)備數(shù)據(jù)以用于后續(xù)任務(wù)的Runnable有:
RunnableAssign
通過添加新字段或更新現(xiàn)有字段來增強(qiáng)或修改輸入數(shù)據(jù),為后續(xù)處理步驟做好準(zhǔn)備。
import { RunnableAssign, RunnableLambda } from "@langchain/core/runnables";
const getGeolocation = RunnableLambda.from(async (x: { ip: string }) => {
//模擬一個(gè)API調(diào)用來獲取地理定位
return { location: `Location for IP ${x.ip}` };
});
const runnableAssign = new RunnableAssign({ getGeolocation });
const res = await runnableAssign.invoke({ name: "John Doe", ip: "192.168.1.1" });
// 輸出結(jié)果: { name: "John Doe", ip: "192.168.1.1", getGeolocation: { location: "Location for IP 192.168.1.1" } }
RunnablePick
從輸入數(shù)據(jù)中選擇并提取特定字段,從而可以對(duì)相關(guān)信息進(jìn)行重點(diǎn)處理。
import { RunnablePick } from "@langchain/core/runnables";
const orderData = {
orderId: "12345",
customerEmail: "customer@example.com",
items: [{ productId: "A1", quantity: 2 }],
totalAmount: 99.99,
shippingAddress: "123 Main St",
};
const receiptInfoRunnable = new RunnablePick(["orderId", "customerEmail", "totalAmount"]);
const res = await receiptInfoRunnable.invoke(orderData);
// 輸出結(jié)果: { orderId: '12345', customerEmail: 'customer@example.com', totalAmount: 99.99 }
RunnablePassthrough
不做任何更改地傳遞輸入數(shù)據(jù),這對(duì)于維護(hù)工作流中的數(shù)據(jù)完整性非常有用。
const chain = RunnableSequence.from([
{
question: new RunnablePassthrough(),
context: async () => loadContextFromStore(),
},
prompt,
llm,
outputParser,
]);
const response = await chain.invoke(
"I can pass a single string instead of an object since I'm using `RunnablePassthrough`."
);
RunnableMap
將轉(zhuǎn)換應(yīng)用于映射對(duì)象中的每個(gè)字段,從而實(shí)現(xiàn)對(duì)鍵值對(duì)的單獨(dú)處理。
const sensorDataRunnable = RunnableMap.from({
temperature: RunnableLambda.from((data: { temp: number }) => `Temperature is ${data.temp}°C`),
humidity: RunnableLambda.from((data: { humidity: number }) => `Humidity is ${data.humidity}%`),
});
const result = await sensorDataRunnable.invoke({ temp: 22, humidity: 45 });
// 輸出結(jié)果: { temperature: 'Temperature is 22°C', humidity: 'Humidity is 45%' }
序列和工作流組合類型
可按順序構(gòu)造和執(zhí)行任務(wù)的Runnable,支持創(chuàng)建復(fù)雜的工作流:
RunnableSequence
以線性方式鏈接多個(gè)Runnable,其中一個(gè)Runnable的輸出成為下一個(gè)Runnable的輸入,形成逐步處理的管道結(jié)構(gòu)。
const imageProcessingChain = RunnableSequence.from([
readImageRunnable,
resizeImageRunnable,
applyFilterRunnable,
saveImageRunnable,
]);
const result = await imageProcessingChain.invoke('path/to/input/image.jpg');
RunnableEach
將Runnable應(yīng)用于集合中的每個(gè)元素,類似于數(shù)組上的映射函數(shù),允許批處理。
import { RunnableEach, RunnableLambda } from "@langchain/core/runnables";
const personalizeEmail = RunnableLambda.from((name: string) => `Dear ${name}, we have an offer for you!`);
const sendEmail = emailSendingRunnable; // 假設(shè)這是在其他地方定義的
const emailChain = new RunnableEach({
bound: personalizeEmail.pipe(sendEmail),
});
const result = await emailChain.invoke(["Alice", "Bob", "Carol"]);
//將電子郵件發(fā)送到:Alice, Bob, and Carol.
RunnableParallel
在同一輸入上同時(shí)執(zhí)行多個(gè)Runnable,從而實(shí)現(xiàn)并發(fā)處理以提高效率。
import { RunnableLambda, RunnableParallel } from "@langchain/core/runnables";
const calculateMean = RunnableLambda.from((data: number[]) => {
return data.reduce((a, b) => a + b, 0) / data.length;
});
const calculateMedian = RunnableLambda.from((data: number[]) => {
const sorted = data.slice().sort((a, b) => a - b);
const mid = Math.floor(sorted.length / 2);
return sorted.length % 2 !== 0 ? sorted[mid] : (sorted[mid - 1] + sorted[mid]) / 2;
});
const calculateMode = RunnableLambda.from((data: number[]) => {
const frequency: { [key: number]: number } = {};
let maxFreq = 0;
let modes: number[] = [];
data.forEach((item) => {
frequency[item] = (frequency[item] || 0) + 1;
if (frequency[item] > maxFreq) {
maxFreq = frequency[item];
modes = [item];
} else if (frequency[item] === maxFreq) {
modes.push(item);
}
});
return modes;
});
const analysisChain = RunnableParallel.from({
mean: calculateMean,
median: calculateMedian,
mode: calculateMode,
});
const res = await analysisChain.invoke([1, 2, 2, 3, 4]);
// 輸出結(jié)果: { mean: 2.4, median: 2, mode: [2] }
錯(cuò)誤處理、彈性和配置類型
通過重試機(jī)制和回退選項(xiàng)增強(qiáng)穩(wěn)健性的Runnable有:
RunnableBinding
通過預(yù)設(shè)某些參數(shù)或配置創(chuàng)建自定義Runnable,允許針對(duì)特定上下文定制可重復(fù)使用的組件。
import { RunnableConfig, RunnableLambda } from "@langchain/core/runnables";
const queryDatabase = (query: string, config?: RunnableConfig) => {
const dbConfig = config?.configurable?.dbConfig;
//使用dbConfig建立連接并執(zhí)行查詢
return `Executed query on ${dbConfig.host}: ${query}`;
};
const runnable = RunnableLambda.from(queryDatabase);
// 綁定用在不同環(huán)境中的配置
const prodRunnable = runnable.bind({ configurable: { dbConfig: { host: 'prod.db.example.com' } } });
const testRunnable = runnable.bind({ configurable: { dbConfig: { host: 'test.db.example.com' } } });
const result1 = await prodRunnable.invoke("SELECT * FROM users;");
// 輸出結(jié)果: "Executed query on prod.db.example.com: SELECT * FROM users;"
const result2 = await testRunnable.invoke("SELECT * FROM users;");
// 輸出結(jié)果: "Executed query on test.db.example.com: SELECT * FROM users;"
RunnableRetry
根據(jù)指定的重試策略在失敗時(shí)自動(dòng)重試Runnable,增強(qiáng)對(duì)瞬態(tài)錯(cuò)誤的恢復(fù)能力。
import { RunnableLambda } from "@langchain/core/runnables";
const fetchWeatherData = async (location: string): Promise<string> => {
//模擬一個(gè)可能會(huì)失敗的API調(diào)用
if (Math.random() < 0.7) {
throw new Error("Network error");
}
return `Weather data for ${location}`;
};
const fetchWeatherLambda = RunnableLambda.from(fetchWeatherData);
//應(yīng)用重試邏輯
const fetchWeatherWithRetry = fetchWeatherLambda.withRetry({ stopAfterAttempt: 5 });
try {
const res = await fetchWeatherWithRetry.invoke("New York");
console.log(res);
} catch (error) {
console.error("Failed to fetch weather data after retries:", error.message);
}
RunnableWithFallbacks
如果主Runnable失敗,則提供要執(zhí)行的替代Runnable,以確保工作流程可以繼續(xù)或正常降級(jí)。
import { RunnableLambda, RunnableWithFallbacks } from "@langchain/core/runnables";
const primaryDataSource = async (id: string): Promise<string> => {
//模擬失敗故障
throw new Error("Primary data source is unavailable");
};
const secondaryDataSource = async (id: string): Promise<string> => {
return `Data for ${id} from secondary source`;
};
const primaryRunnable = RunnableLambda.from(primaryDataSource);
const fallbackRunnable = RunnableLambda.from(secondaryDataSource);
//帶回退的設(shè)置
const dataRunnable = primaryRunnable.withFallbacks([fallbackRunnable]);
const res = await dataRunnable.invoke("item123");
// 輸出結(jié)果: "Data for item123 from secondary source"
整合
在前面的部分中,我們探討了單個(gè)Runnable及其在構(gòu)建模塊化工作流中的作用?,F(xiàn)在,讓我們看看如何組合這些Runnable來創(chuàng)建全面的實(shí)際應(yīng)用程序。以下三個(gè)示例演示了如何集成多個(gè)Runnable來解決復(fù)雜問題。
示例1:智能文檔處理管道
現(xiàn)在,我們假定有一家公司希望自動(dòng)處理發(fā)票、收據(jù)和合同等傳入文檔。開發(fā)的軟件系統(tǒng)目標(biāo)是對(duì)文檔類型進(jìn)行分類、提取相關(guān)數(shù)據(jù)、驗(yàn)證數(shù)據(jù)并將其存儲(chǔ)在數(shù)據(jù)庫中,并且能夠妥善處理錯(cuò)誤,并在發(fā)生瞬時(shí)故障時(shí)重試操作。
使用的Runnable有RunnableSequence、RouterRunnable、RunnableParallel、RunnableRetry、RunnableWithFallbacks、RunnableAssign、RunnableLambda等,關(guān)鍵代碼如下:
import {
RunnableSequence,
RouterRunnable,
RunnableLambda,
} from "@langchain/core/runnables";
//定義一個(gè)統(tǒng)一的輸出類型
type UnifiedOutput = {
type: string;
amount?: number;
dueDate?: string;
client?: string;
parties?: string[];
term?: string;
total?: number;
items?: string[];
};
// 步驟1:OCR處理(用函數(shù)模擬)
const ocrRunnable = RunnableLambda.from(async (imageBuffer: string) => {
//模擬OCR處理
return "Extracted text: Invoice for Acme Corp";
});
//步驟2:文件分類
const classifyDocument = RunnableLambda.from(async (text: string) => {
//模擬文件分類
if (text.includes("Invoice")) return "invoice";
if (text.includes("Contract")) return "contract";
return "receipt";
});
//步驟3:每種文檔類型的數(shù)據(jù)提取Runnables
const extractInvoiceData = RunnableLambda.from(
async (text: string): Promise<UnifiedOutput> => {
// 提取特定于發(fā)票的數(shù)據(jù)
return {
type: "invoice",
amount: 1000,
dueDate: "2024-12-31",
client: "Acme Corp",
};
}
);
const extractContractData = RunnableLambda.from(
async (text: string): Promise<UnifiedOutput> => {
// 提取特定于合同的數(shù)據(jù)
return {
type: "contract",
parties: ["Company A", "Company B"],
term: "2 years",
};
}
);
const extractReceiptData = RunnableLambda.from(
async (text: string): Promise<UnifiedOutput> => {
//提取特定于收據(jù)的數(shù)據(jù)
return { type: "receipt", total: 50, items: ["Item1", "Item2"] };
}
);
const dataExtractionRouter = new RouterRunnable({
runnables: {
invoice: extractInvoiceData,
contract: extractContractData,
receipt: extractReceiptData,
},
});
// 步驟5:數(shù)據(jù)驗(yàn)證
const validateData = RunnableLambda.from(async (data: UnifiedOutput) => {
//執(zhí)行驗(yàn)證邏輯
if (!data || !data.type)
throw new Error("Validation failed: Data is missing or invalid");
return { ...data, isValid: true };
});
// 步驟6:保存到數(shù)據(jù)庫(使用一個(gè)函數(shù)進(jìn)行模擬)
const saveToDatabase = RunnableLambda.from(async (data: UnifiedOutput) => {
// 模擬保存到數(shù)據(jù)庫
return `Data saved: ${JSON.stringify(data)}`;
});
//步驟7:構(gòu)建工作流序列
const documentProcessingWorkflow = RunnableSequence.from<string, any>([
ocrRunnable,
classifyDocument,
dataExtractionRouter,
validateData,
saveToDatabase.withRetry({ stopAfterAttempt: 3 }),
]);
// 步驟8:添加帶有回退支持的錯(cuò)誤處理
const workflowWithFallback = documentProcessingWorkflow.withFallbacks({
fallbacks: [
RunnableLambda.from(async () => {
return "An error occurred. Please try again later.";
}),
],
});
//執(zhí)行工作流
(async () => {
try {
const result = await workflowWithFallback.invoke("Document image bytes");
console.log(result);
//期望的輸出結(jié)果: "Data saved: { type: 'invoice', amount: 1000, dueDate: '2024-12-31', client: 'Acme Corp', isValid: true }"
} catch (error: any) {
console.error("Failed to process document:", (error as Error).message);
}
})();
在上面代碼中,工作流首先使用ocrRunnable將文檔圖像轉(zhuǎn)換為文本。提取的文本被分類為文檔類型(發(fā)票、合同或收據(jù))。RouterRunnable根據(jù)文檔類型將文本定向到適當(dāng)?shù)臄?shù)據(jù)提取Runnable。提取的數(shù)據(jù)經(jīng)過驗(yàn)證,然后保存到數(shù)據(jù)庫。RunnableRetry確保在發(fā)生瞬時(shí)故障時(shí)重試保存最多三次。如果任一步驟失敗,RunnableWithFallbacks會(huì)提供回退消息以妥善處理錯(cuò)誤。
示例2:個(gè)性化推薦引擎
電子商務(wù)平臺(tái)希望根據(jù)用戶的瀏覽歷史和偏好為他們提供個(gè)性化的產(chǎn)品推薦。
使用的Runnable有RunnableParallel、RunnableMap、RunnableBranch、RunnableWithFallbacks等,相關(guān)代碼如下:
import {
RunnableParallel,
RunnableMap,
RunnableBranch,
RunnableSequence,
RunnableLambda,
} from "@langchain/core/runnables";
// 步驟1:并行地從多個(gè)來源獲取用戶數(shù)據(jù)
const fetchUserData = RunnableParallel.from({
browsingHistory: RunnableLambda.from(async (userId) => {
// 模擬獲取瀏覽歷史記錄
return ["Item1", "Item2"];
}),
purchaseHistory: RunnableLambda.from(async (userId) => {
// 模擬獲取購買歷史記錄
return ["Item3"];
}),
});
// 步驟2:映射到獲取的數(shù)據(jù)上以進(jìn)行處理
const processUserData = RunnableMap.from({
browsingHistory: RunnableLambda.from((history: any[]) => {
//處理瀏覽歷史記錄
return history.map((item) => `Processed ${item}`);
}),
purchaseHistory: RunnableLambda.from((history: any[]) => {
// 處理采購歷史記錄
return history.map((item) => `Processed ${item}`);
}),
});
//步驟3:定義推薦算法
const newUserRecommendations = RunnableLambda.from(async (user) => {
// 針對(duì)新用戶的邏輯
return ["Product A", "Product B", "Product C"];
});
const returningUserRecommendations = RunnableLambda.from(async (user) => {
//基于歷史記錄返回用戶的邏輯
return ["Product X", "Product Y", "Product Z"];
});
// 步驟4:基于用戶類型的分支處理
const recommendationBranch = RunnableBranch.from([
[(user: any) => user.isNew, newUserRecommendations],
returningUserRecommendations,
]);
//步驟5:創(chuàng)建一個(gè)回退推薦系統(tǒng)
const defaultRecommendations = RunnableLambda.from(async (user) => {
//默認(rèn)的推薦
return ["Default Product 1", "Default Product 2"];
});
const recommendationWithFallback = recommendationBranch.withFallbacks([
defaultRecommendations,
]);
// 步驟6:對(duì)整個(gè)工作流進(jìn)行排隊(duì)操作
const recommendationWorkflow = RunnableSequence.from([
fetchUserData,
processUserData,
(data) => ({ ...data, isNew: data.purchaseHistory.length === 0 }),
recommendationWithFallback,
]);
// 用法
const userId = "user123";
const recommendations = recommendationWorkflow.invoke(userId);
// 輸出:基于用戶數(shù)據(jù)的個(gè)性化推薦
在上面代碼中,工作流首先使用RunnableParallel同時(shí)獲取用戶的瀏覽歷史、購買歷史和個(gè)人資料。然后,使用RunnableMap單獨(dú)處理每條數(shù)據(jù),為生成推薦做好準(zhǔn)備。
RunnableBranch根據(jù)用戶的個(gè)人資料決定使用哪種推薦算法:
- 如果用戶是高級(jí)會(huì)員(isPremiumMembe為true),則使用premiumUserRecommendations。
- 如果用戶沒有購買歷史(表示是新用戶),則使用newUserRecommendations。
- 否則,默認(rèn)為regularUserRecommendations。
如果推薦過程中的任一步驟失敗,RunnableWithFallbacks可確保系統(tǒng)提供一組默認(rèn)推薦,從而保持良好的用戶體驗(yàn)。
最后,RunnableSequence協(xié)調(diào)整個(gè)工作流,確保每個(gè)步驟都按正確的順序進(jìn)行。工作流通過使用userId調(diào)用來執(zhí)行,并根據(jù)用戶的數(shù)據(jù)輸出個(gè)性化推薦。
示例3:用于分析的數(shù)據(jù)處理管道
現(xiàn)在,我們假定有一家公司需要處理大型數(shù)據(jù)集以生成涉及數(shù)據(jù)清理、轉(zhuǎn)換、分析和可視化的分析報(bào)告。
使用的Runnable有RunnableSequence、RunnableEach、RunnableRetry、RunnableBinding,關(guān)鍵代碼如下:
import {
RunnableSequence,
RunnableEach,
RunnableLambda,
} from "@langchain/core/runnables";
// 步驟1:通過使用重試的方式定義數(shù)據(jù)獲取
const fetchData = RunnableLambda.from(async (source) => {
// Simulate data fetching, which may fail
if (Math.random() < 0.2) {
throw new Error("Data fetch error");
}
return `Data from ${source}`;
}).withRetry({ stopAfterAttempt: 3 });
//步驟2:數(shù)據(jù)清理
const cleanData = RunnableLambda.from((data) => {
// Perform data cleaning
return `Cleaned ${data}`;
});
// 步驟3:數(shù)據(jù)轉(zhuǎn)換
const transformData = RunnableLambda.from((data) => {
// Transform data
return `Transformed ${data}`;
});
//步驟4:數(shù)據(jù)分析
const analyzeData = RunnableLambda.from((data) => {
// Analyze data
return `Analysis results of ${data}`;
});
//步驟5:數(shù)據(jù)可視化
const visualizeData = RunnableLambda.from((analysis) => {
// Generate visualization
return `Visualization of ${analysis}`;
});
//步驟6:步驟排隊(duì)
const dataProcessingSequence = RunnableSequence.from([
cleanData,
transformData,
analyzeData,
visualizeData,
]);
// 步驟7:處理多個(gè)數(shù)據(jù)源
const dataSources = ["Dataset A", "Dataset B", "Dataset C"];
const processAllData = new RunnableEach({
bound: fetchData.pipe(dataProcessingSequence),
});
// 用法
const reports = processAllData.invoke(dataSources);
// 輸出:每個(gè)數(shù)據(jù)源的可視化結(jié)果數(shù)組
上述代碼中,此工作流處理來自不同來源的多個(gè)數(shù)據(jù)集的數(shù)據(jù)處理。首先,定義一個(gè)使用RunnableBinding綁定到特定數(shù)據(jù)源的fetchData可運(yùn)行程序。每個(gè)數(shù)據(jù)獲取操作都使用RunnableRetry包裝,通過重試最多三次來處理瞬時(shí)故障。
從每個(gè)源獲取的數(shù)據(jù)經(jīng)過RunnableSequence定義的一系列處理步驟:
- 數(shù)據(jù)清理:刪除或更正錯(cuò)誤數(shù)據(jù)。
- 數(shù)據(jù)轉(zhuǎn)換:將數(shù)據(jù)轉(zhuǎn)換為適合分析的格式。
- 數(shù)據(jù)分析:執(zhí)行分析計(jì)算。
- 數(shù)據(jù)可視化:生成針對(duì)分析結(jié)果的可視化表示。
在此,使用RunnableEach來并行處理多個(gè)數(shù)據(jù)集。這個(gè)對(duì)象將相同的處理順序應(yīng)用于每個(gè)數(shù)據(jù)集上。
結(jié)論
總體來看,LangChain中的Runnable架構(gòu)是構(gòu)建涉及大型語言模型(LLM)的復(fù)雜模塊化工作流的強(qiáng)大基礎(chǔ)。在本文中,我們探討了如何創(chuàng)建和組合各種Runnable以應(yīng)對(duì)各種挑戰(zhàn):
- 路由和分支:利用RouterRunnable和RunnableBranch可以根據(jù)運(yùn)行時(shí)條件實(shí)現(xiàn)動(dòng)態(tài)執(zhí)行路徑。
- 數(shù)據(jù)操作和分配:RunnableAssign、RunnablePick和RunnableMap等工具提供靈活的數(shù)據(jù)轉(zhuǎn)換功能,為后續(xù)處理步驟準(zhǔn)備輸入。
- 序列和工作流組合:通過使用RunnableSequence、RunnableEach和RunnableParallel鏈接任務(wù),開發(fā)人員可以協(xié)調(diào)流程,無論它們是需要順序執(zhí)行還是并行處理。
- 錯(cuò)誤處理和彈性:借助RunnableRetry和RunnableWithFallbacks,工作流可以優(yōu)雅地處理錯(cuò)誤并提供回退機(jī)制。
總之,Runnable提倡了一種結(jié)構(gòu)化的方法來構(gòu)建LLM代理和鏈。最后,在你計(jì)劃將LangChain集成到實(shí)際項(xiàng)目中時(shí),請(qǐng)認(rèn)真考慮Runnables如何增強(qiáng)你的工作流,使其更靈活、更有彈性且更易于維護(hù)。
譯者介紹
朱先忠,51CTO社區(qū)編輯,51CTO專家博客、講師,濰坊一所高校計(jì)算機(jī)教師,自由編程界老兵一枚。
原文標(biāo)題:Guide to LangChain Runnable Architecture,作者:Pavlo Sobchuk