自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

LangChain Runnable架構(gòu)實(shí)戰(zhàn)解析

譯文 精選
人工智能
本文將帶你深入了解LangChain框架背后的基礎(chǔ)架構(gòu)并提供基本實(shí)戰(zhàn)代碼,以及開發(fā)復(fù)雜的基于AI代理系統(tǒng)時(shí)直接可用的通用代碼。

譯者 | 朱先忠

審校 | 重樓

LangChain框架是一個(gè)非常強(qiáng)大的工具,它大大加快了LLM在項(xiàng)目和代理開發(fā)中的有效使用。該框架提供了一種高級(jí)抽象,允許開發(fā)人員立即開始使用模型并將其集成到他們的產(chǎn)品中。但是另一方面,了解LangChain的核心概念(例如Runnable的架構(gòu))對(duì)于構(gòu)建LLM代理和鏈的開發(fā)人員非常有益,因?yàn)樗峁┝艘环N結(jié)構(gòu)化的方法和對(duì)使用框架的洞察力。

LangChain架構(gòu)基礎(chǔ)

LangChain中的Runnable架構(gòu)基于命令模式的原理構(gòu)建,命令模式是一種將請(qǐng)求封裝為對(duì)象的行為設(shè)計(jì)模式。這種設(shè)計(jì)有助于參數(shù)化、排隊(duì)和動(dòng)態(tài)執(zhí)行命令,使Runnable對(duì)象在各種工作流中模塊化、可組合和可管理。

Runnable對(duì)象特別適合工作流管理、順序任務(wù)執(zhí)行、處理?xiàng)l件邏輯以及與外部系統(tǒng)交互,它們提供靈活性、可重用性和模塊化。可以動(dòng)態(tài)地將任務(wù)鏈接在一起以創(chuàng)建復(fù)雜的行為場(chǎng)景,同時(shí)保持干凈且易于管理的代碼結(jié)構(gòu)。

Runnable鏈的可能配置之一Runnable鏈的可能配置之一

LangChain中執(zhí)行特定任務(wù)的大多數(shù)高級(jí)對(duì)象都實(shí)現(xiàn)了Runnable類。計(jì)劃包含在鏈中的任何對(duì)象也必須以某種方式實(shí)現(xiàn)Runnable類。有趣的是,Runnable充當(dāng)命令的抽象或一個(gè)具體命令,同時(shí)充當(dāng)了一個(gè)調(diào)用者和接收者角色

一個(gè)值得注意的例子是此類中提供的管道方法,它專門用于創(chuàng)建鏈。此方法允許無縫組合多個(gè)Runnable,使其成為在LangChain中構(gòu)建和執(zhí)行工作流的基石。

在上圖中,可以看到Runnable如何與其各種實(shí)現(xiàn)一起運(yùn)行。接下來,我們將在本文中詳細(xì)研究這些實(shí)現(xiàn)。

創(chuàng)建Runnable

實(shí)際上,有兩種方法可以創(chuàng)建Runnable:通過RunnableLambda或擴(kuò)展基Runnable類。

將RunnableLambda用于簡(jiǎn)單函數(shù)

創(chuàng)建Runnable的最簡(jiǎn)單方法是使用RunnableLambda。此類允許將任何函數(shù)包裝為Runnable,從而允許動(dòng)態(tài)行為而無需自定義類。

import { RunnableLambda } from "@langchain/core/runnables";

//定義一個(gè)簡(jiǎn)單函數(shù)
const toUpperCase = (text: string): string => text.toUpperCase();

//將函數(shù)包裝為Runnable
const upperCaseRunnable = RunnableLambda.from(toUpperCase);

//調(diào)用Runnable
const result = await upperCaseRunnable.invoke("hello world");
//輸出: "HELLO WORLD"

擴(kuò)展Runnable基類

對(duì)于更高級(jí)的使用場(chǎng)景,可以擴(kuò)展Runnable基類。此方法可完全控制執(zhí)行生命周期,包括調(diào)用、批處理和流等方法。

import { Runnable } from "@langchain/core/runnables";

class GreetUserRunnable extends Runnable<string, string> {
 lc_namespace = ["GreetUser"];

 onStart(data: { input: string }) {
 console.log(`Starting with input: ${data.input}`);
 }

 onEnd(data: { result: string }) {
 console.log(`Finished with result: ${data.result}`);
 }

 onError(error: unknown) {
 console.error(`Error occurred: ${(error as Error).message}`);
 }
 //自定義執(zhí)行邏輯
 async invoke(name: string): Promise<string> {
 this.onStart({ input: name });
 try {
 const greeting = `Hello, ${name}!`;
 this.onEnd({ result: greeting });
 return greeting;
 } catch (error) {
 this.onError(error);
 throw error;
 }
 }
}

使用Runnable構(gòu)建工作流

LangChain中的Runnable架構(gòu)通過按功能分組的專用Runnable進(jìn)行了擴(kuò)展,使其用途廣泛且適用于各種應(yīng)用程序。

路由和分支類型

根據(jù)條件或輸入管理執(zhí)行流程的Runnable

RouterRunnable

根據(jù)給定的將輸入定向到特定的Runnable,類似于switch-case語句。適用于基于運(yùn)行時(shí)參數(shù)的動(dòng)態(tài)任務(wù)執(zhí)行。

import { RouterRunnable, RunnableLambda } from "@langchain/core/runnables";

const router = new RouterRunnable({
 runnables: {
 billing: RunnableLambda.from((query: string) => `Billing Department: ${query}`),
 technical: RunnableLambda.from((query: string) => `Technical Support: ${query}`),
 general: RunnableLambda.from((query: string) => `General Inquiry: ${query}`),
 },
});

//路由一個(gè)賬單問題
const result1 = await router.invoke({ key: "billing", input: "I have a question about my invoice." });
// 輸出: "Billing Department: I have a question about my invoice."

//路由一個(gè)技術(shù)問題
const result2 = await router.invoke({ key: "technical", input: "My internet is not working." });
// 輸出: "Technical Support: My internet is not working."

RunnableBranch

根據(jù)條件檢查從多個(gè)選項(xiàng)中執(zhí)行特定的Runnable,使工作流能夠適應(yīng)不同的輸入場(chǎng)景。

const branch = RunnableBranch.from([
 [
 (user: { age: number }) => user.age < 18,
 RunnableLambda.from((user) => `Hey ${user.name}, check out our new teen collection!`),
 ],
 [
 (user: { age: number }) => user.age >= 18 && user.age < 30,
 RunnableLambda.from((user) => `Hi ${user.name}, explore our trendy outfits for young adults!`),
 ],
 RunnableLambda.from((user) => `Hello ${user.name}, discover our premium range!`),
]);

const result = await branch.invoke({ name: "Alice", age: 25 });
// 輸出: "Hi Alice, explore our trendy outfits for young adults!"

數(shù)據(jù)操作和分配類型

轉(zhuǎn)換或準(zhǔn)備數(shù)據(jù)以用于后續(xù)任務(wù)的Runnable

RunnableAssign

通過添加新字段或更新現(xiàn)有字段來增強(qiáng)或修改輸入數(shù)據(jù),為后續(xù)處理步驟做好準(zhǔn)備。

import { RunnableAssign, RunnableLambda } from "@langchain/core/runnables";

const getGeolocation = RunnableLambda.from(async (x: { ip: string }) => {
 //模擬一個(gè)API調(diào)用來獲取地理定位
 return { location: `Location for IP ${x.ip}` };
});

const runnableAssign = new RunnableAssign({ getGeolocation });

const res = await runnableAssign.invoke({ name: "John Doe", ip: "192.168.1.1" });

// 輸出結(jié)果: { name: "John Doe", ip: "192.168.1.1", getGeolocation: { location: "Location for IP 192.168.1.1" } }

RunnablePick

從輸入數(shù)據(jù)中選擇并提取特定字段,從而可以對(duì)相關(guān)信息進(jìn)行重點(diǎn)處理。

import { RunnablePick } from "@langchain/core/runnables";

const orderData = {
 orderId: "12345",
 customerEmail: "customer@example.com",
 items: [{ productId: "A1", quantity: 2 }],
 totalAmount: 99.99,
 shippingAddress: "123 Main St",
};

const receiptInfoRunnable = new RunnablePick(["orderId", "customerEmail", "totalAmount"]);

const res = await receiptInfoRunnable.invoke(orderData);

// 輸出結(jié)果: { orderId: '12345', customerEmail: 'customer@example.com', totalAmount: 99.99 }

RunnablePassthrough

不做任何更改地傳遞輸入數(shù)據(jù),這對(duì)于維護(hù)工作流中的數(shù)據(jù)完整性非常有用。

const chain = RunnableSequence.from([
 {
 question: new RunnablePassthrough(),
 context: async () => loadContextFromStore(),
 },
 prompt,
 llm,
 outputParser,
]);
const response = await chain.invoke(
 "I can pass a single string instead of an object since I'm using `RunnablePassthrough`."
);

RunnableMap

將轉(zhuǎn)換應(yīng)用于映射對(duì)象中的每個(gè)字段,從而實(shí)現(xiàn)對(duì)鍵值對(duì)的單獨(dú)處理。

const sensorDataRunnable = RunnableMap.from({
 temperature: RunnableLambda.from((data: { temp: number }) => `Temperature is ${data.temp}°C`),
 humidity: RunnableLambda.from((data: { humidity: number }) => `Humidity is ${data.humidity}%`),
});

const result = await sensorDataRunnable.invoke({ temp: 22, humidity: 45 });

// 輸出結(jié)果: { temperature: 'Temperature is 22°C', humidity: 'Humidity is 45%' }

序列和工作流組合類型

可按順序構(gòu)造和執(zhí)行任務(wù)的Runnable,支持創(chuàng)建復(fù)雜的工作流:

RunnableSequence

以線性方式鏈接多個(gè)Runnable,其中一個(gè)Runnable的輸出成為下一個(gè)Runnable的輸入,形成逐步處理管道結(jié)構(gòu)

const imageProcessingChain = RunnableSequence.from([
 readImageRunnable,
 resizeImageRunnable,
 applyFilterRunnable,
 saveImageRunnable,
]);

const result = await imageProcessingChain.invoke('path/to/input/image.jpg');

RunnableEach

將Runnable應(yīng)用于集合中的每個(gè)元素,類似于數(shù)組上的映射函數(shù),允許批處理。

import { RunnableEach, RunnableLambda } from "@langchain/core/runnables";

const personalizeEmail = RunnableLambda.from((name: string) => `Dear ${name}, we have an offer for you!`);
const sendEmail = emailSendingRunnable; // 假設(shè)這是在其他地方定義的

const emailChain = new RunnableEach({
 bound: personalizeEmail.pipe(sendEmail),
});

const result = await emailChain.invoke(["Alice", "Bob", "Carol"]);

//將電子郵件發(fā)送到:Alice, Bob, and Carol.

RunnableParallel

在同一輸入上同時(shí)執(zhí)行多個(gè)Runnable,從而實(shí)現(xiàn)并發(fā)處理以提高效率。

import { RunnableLambda, RunnableParallel } from "@langchain/core/runnables";

const calculateMean = RunnableLambda.from((data: number[]) => {
 return data.reduce((a, b) => a + b, 0) / data.length;
});

const calculateMedian = RunnableLambda.from((data: number[]) => {
 const sorted = data.slice().sort((a, b) => a - b);
 const mid = Math.floor(sorted.length / 2);
 return sorted.length % 2 !== 0 ? sorted[mid] : (sorted[mid - 1] + sorted[mid]) / 2;
});

const calculateMode = RunnableLambda.from((data: number[]) => {
 const frequency: { [key: number]: number } = {};
 let maxFreq = 0;
 let modes: number[] = [];
 data.forEach((item) => {
 frequency[item] = (frequency[item] || 0) + 1;
 if (frequency[item] > maxFreq) {
 maxFreq = frequency[item];
 modes = [item];
 } else if (frequency[item] === maxFreq) {
 modes.push(item);
 }
 });
 return modes;
});

const analysisChain = RunnableParallel.from({
 mean: calculateMean,
 median: calculateMedian,
 mode: calculateMode,
});

const res = await analysisChain.invoke([1, 2, 2, 3, 4]);

// 輸出結(jié)果: { mean: 2.4, median: 2, mode: [2] }

錯(cuò)誤處理、彈性和配置類型

通過重試機(jī)制和回退選項(xiàng)增強(qiáng)穩(wěn)健性的Runnable

RunnableBinding

通過預(yù)設(shè)某些參數(shù)或配置創(chuàng)建自定義Runnable,允許針對(duì)特定上下文定制可重復(fù)使用的組件。

import { RunnableConfig, RunnableLambda } from "@langchain/core/runnables";

const queryDatabase = (query: string, config?: RunnableConfig) => {
 const dbConfig = config?.configurable?.dbConfig;
 //使用dbConfig建立連接并執(zhí)行查詢
 return `Executed query on ${dbConfig.host}: ${query}`;
};

const runnable = RunnableLambda.from(queryDatabase);

// 綁定用在不同環(huán)境中的配置
const prodRunnable = runnable.bind({ configurable: { dbConfig: { host: 'prod.db.example.com' } } });
const testRunnable = runnable.bind({ configurable: { dbConfig: { host: 'test.db.example.com' } } });

const result1 = await prodRunnable.invoke("SELECT * FROM users;");
// 輸出結(jié)果: "Executed query on prod.db.example.com: SELECT * FROM users;"

const result2 = await testRunnable.invoke("SELECT * FROM users;");
// 輸出結(jié)果: "Executed query on test.db.example.com: SELECT * FROM users;"

RunnableRetry

根據(jù)指定的重試策略在失敗時(shí)自動(dòng)重試Runnable,增強(qiáng)對(duì)瞬態(tài)錯(cuò)誤的恢復(fù)能力。

import { RunnableLambda } from "@langchain/core/runnables";

const fetchWeatherData = async (location: string): Promise<string> => {
 //模擬一個(gè)可能會(huì)失敗的API調(diào)用
 if (Math.random() < 0.7) {
 throw new Error("Network error");
 }
 return `Weather data for ${location}`;
};

const fetchWeatherLambda = RunnableLambda.from(fetchWeatherData);

//應(yīng)用重試邏輯
const fetchWeatherWithRetry = fetchWeatherLambda.withRetry({ stopAfterAttempt: 5 });

try {
 const res = await fetchWeatherWithRetry.invoke("New York");
 console.log(res);
} catch (error) {
 console.error("Failed to fetch weather data after retries:", error.message);
}

RunnableWithFallbacks

如果主Runnable失敗,則提供要執(zhí)行的替代Runnable,以確保工作流程可以繼續(xù)或正常降級(jí)。

import { RunnableLambda, RunnableWithFallbacks } from "@langchain/core/runnables";

const primaryDataSource = async (id: string): Promise<string> => {
 //模擬失敗故障
 throw new Error("Primary data source is unavailable");
};

const secondaryDataSource = async (id: string): Promise<string> => {
 return `Data for ${id} from secondary source`;
};

const primaryRunnable = RunnableLambda.from(primaryDataSource);
const fallbackRunnable = RunnableLambda.from(secondaryDataSource);

//帶回退的設(shè)置
const dataRunnable = primaryRunnable.withFallbacks([fallbackRunnable]);

const res = await dataRunnable.invoke("item123");
// 輸出結(jié)果: "Data for item123 from secondary source"

整合

在前面的部分中,我們探討了單個(gè)Runnable及其在構(gòu)建模塊化工作流中的作用?,F(xiàn)在,讓我們看看如何組合這些Runnable來創(chuàng)建全面的實(shí)際應(yīng)用程序。以下三個(gè)示例演示了如何集成多個(gè)Runnable來解決復(fù)雜問題。

示例1:智能文檔處理管道

現(xiàn)在,我們假定有一家公司希望自動(dòng)處理發(fā)票、收據(jù)和合同等傳入文檔。開發(fā)的軟件系統(tǒng)目標(biāo)是對(duì)文檔類型進(jìn)行分類、提取相關(guān)數(shù)據(jù)、驗(yàn)證數(shù)據(jù)并將其存儲(chǔ)在數(shù)據(jù)庫中,并且能夠妥善處理錯(cuò)誤,并在發(fā)生瞬時(shí)故障時(shí)重試操作。

使用的RunnableRunnableSequence、RouterRunnable、RunnableParallel、RunnableRetry、RunnableWithFallbacks、RunnableAssign、RunnableLambda,關(guān)鍵代碼如下:

import {
 RunnableSequence,
 RouterRunnable,
 RunnableLambda,
} from "@langchain/core/runnables";

//定義一個(gè)統(tǒng)一的輸出類型
type UnifiedOutput = {
 type: string;
 amount?: number;
 dueDate?: string;
 client?: string;
 parties?: string[];
 term?: string;
 total?: number;
 items?: string[];
};

// 步驟1:OCR處理(用函數(shù)模擬)
const ocrRunnable = RunnableLambda.from(async (imageBuffer: string) => {
 //模擬OCR處理
 return "Extracted text: Invoice for Acme Corp";
});

//步驟2:文件分類
const classifyDocument = RunnableLambda.from(async (text: string) => {
 //模擬文件分類
 if (text.includes("Invoice")) return "invoice";
 if (text.includes("Contract")) return "contract";
 return "receipt";
});

//步驟3:每種文檔類型的數(shù)據(jù)提取Runnables
const extractInvoiceData = RunnableLambda.from(
 async (text: string): Promise<UnifiedOutput> => {
 // 提取特定于發(fā)票的數(shù)據(jù)
 return {
 type: "invoice",
 amount: 1000,
 dueDate: "2024-12-31",
 client: "Acme Corp",
 };
 }
);

const extractContractData = RunnableLambda.from(
 async (text: string): Promise<UnifiedOutput> => {
 // 提取特定于合同的數(shù)據(jù)
 return {
 type: "contract",
 parties: ["Company A", "Company B"],
 term: "2 years",
 };
 }
);

const extractReceiptData = RunnableLambda.from(
 async (text: string): Promise<UnifiedOutput> => {
 //提取特定于收據(jù)的數(shù)據(jù)
 return { type: "receipt", total: 50, items: ["Item1", "Item2"] };
 }
);

const dataExtractionRouter = new RouterRunnable({
 runnables: {
 invoice: extractInvoiceData,
 contract: extractContractData,
 receipt: extractReceiptData,
 },
});

// 步驟5:數(shù)據(jù)驗(yàn)證
const validateData = RunnableLambda.from(async (data: UnifiedOutput) => {
 //執(zhí)行驗(yàn)證邏輯
 if (!data || !data.type)
 throw new Error("Validation failed: Data is missing or invalid");
 return { ...data, isValid: true };
});

// 步驟6:保存到數(shù)據(jù)庫(使用一個(gè)函數(shù)進(jìn)行模擬)
const saveToDatabase = RunnableLambda.from(async (data: UnifiedOutput) => {
 // 模擬保存到數(shù)據(jù)庫
 return `Data saved: ${JSON.stringify(data)}`;
});

//步驟7:構(gòu)建工作流序列
const documentProcessingWorkflow = RunnableSequence.from<string, any>([
 ocrRunnable,
 classifyDocument,
 dataExtractionRouter,
 validateData,
 saveToDatabase.withRetry({ stopAfterAttempt: 3 }),
]);

// 步驟8:添加帶有回退支持的錯(cuò)誤處理
const workflowWithFallback = documentProcessingWorkflow.withFallbacks({
 fallbacks: [
 RunnableLambda.from(async () => {
 return "An error occurred. Please try again later.";
 }),
 ],
});

//執(zhí)行工作流
(async () => {
 try {
 const result = await workflowWithFallback.invoke("Document image bytes");
 console.log(result);
 //期望的輸出結(jié)果: "Data saved: { type: 'invoice', amount: 1000, dueDate: '2024-12-31', client: 'Acme Corp', isValid: true }"
 } catch (error: any) {
 console.error("Failed to process document:", (error as Error).message);
 }
})();

在上面代碼中,工作流首先使用ocrRunnable將文檔圖像轉(zhuǎn)換為文本。提取的文本被分類為文檔類型(發(fā)票、合同或收據(jù))。RouterRunnable根據(jù)文檔類型將文本定向到適當(dāng)?shù)臄?shù)據(jù)提取Runnable。提取的數(shù)據(jù)經(jīng)過驗(yàn)證,然后保存到數(shù)據(jù)庫。RunnableRetry確保在發(fā)生瞬時(shí)故障時(shí)重試保存最多三次。如果任步驟失敗,RunnableWithFallbacks會(huì)提供回退消息以妥善處理錯(cuò)誤。

示例2:個(gè)性化推薦引擎

電子商務(wù)平臺(tái)希望根據(jù)用戶的瀏覽歷史和偏好為他們提供個(gè)性化的產(chǎn)品推薦。

使用的RunnableRunnableParallel、RunnableMap、RunnableBranch、RunnableWithFallbacks等,相關(guān)代碼如下:

import {
 RunnableParallel,
 RunnableMap,
 RunnableBranch,
 RunnableSequence,
 RunnableLambda,
} from "@langchain/core/runnables";

// 步驟1:并行地從多個(gè)來源獲取用戶數(shù)據(jù)
const fetchUserData = RunnableParallel.from({
 browsingHistory: RunnableLambda.from(async (userId) => {
 // 模擬獲取瀏覽歷史記錄
 return ["Item1", "Item2"];
 }),
 purchaseHistory: RunnableLambda.from(async (userId) => {
 // 模擬獲取購買歷史記錄
 return ["Item3"];
 }),
});

// 步驟2:映射到獲取的數(shù)據(jù)上以進(jìn)行處理
const processUserData = RunnableMap.from({
 browsingHistory: RunnableLambda.from((history: any[]) => {
 //處理瀏覽歷史記錄
 return history.map((item) => `Processed ${item}`);
 }),
 purchaseHistory: RunnableLambda.from((history: any[]) => {
 // 處理采購歷史記錄
 return history.map((item) => `Processed ${item}`);
 }),
});

//步驟3:定義推薦算法
const newUserRecommendations = RunnableLambda.from(async (user) => {
 // 針對(duì)新用戶的邏輯
 return ["Product A", "Product B", "Product C"];
});

const returningUserRecommendations = RunnableLambda.from(async (user) => {
 //基于歷史記錄返回用戶的邏輯
 return ["Product X", "Product Y", "Product Z"];
});

// 步驟4:基于用戶類型的分支處理
const recommendationBranch = RunnableBranch.from([
 [(user: any) => user.isNew, newUserRecommendations],
 returningUserRecommendations,
]);

//步驟5:創(chuàng)建一個(gè)回退推薦系統(tǒng)
const defaultRecommendations = RunnableLambda.from(async (user) => {
 //默認(rèn)的推薦
 return ["Default Product 1", "Default Product 2"];
});

const recommendationWithFallback = recommendationBranch.withFallbacks([
 defaultRecommendations,
]);

// 步驟6:對(duì)整個(gè)工作流進(jìn)行排隊(duì)操作
const recommendationWorkflow = RunnableSequence.from([
 fetchUserData,
 processUserData,
 (data) => ({ ...data, isNew: data.purchaseHistory.length === 0 }),
 recommendationWithFallback,
]);

// 用法
const userId = "user123";
const recommendations = recommendationWorkflow.invoke(userId);
// 輸出:基于用戶數(shù)據(jù)的個(gè)性化推薦

在上面代碼中,工作流首先使用RunnableParallel同時(shí)獲取用戶的瀏覽歷史、購買歷史和個(gè)人資料。然后,使用RunnableMap單獨(dú)處理每條數(shù)據(jù),為生成推薦做好準(zhǔn)備。

RunnableBranch根據(jù)用戶的個(gè)人資料決定使用哪種推薦算法:

  • 如果用戶是高級(jí)會(huì)員(isPremiumMembe為true),則使用premiumUserRecommendations。
  • 如果用戶沒有購買歷史(表示是新用戶),則使用newUserRecommendations。
  • 否則,默認(rèn)為regularUserRecommendations。

如果推薦過程中的任步驟失敗,RunnableWithFallbacks可確保系統(tǒng)提供一組默認(rèn)推薦,從而保持良好的用戶體驗(yàn)。

最后,RunnableSequence協(xié)調(diào)整個(gè)工作流,確保每個(gè)步驟都按正確的順序進(jìn)行。工作流通過使用userId調(diào)用來執(zhí)行,并根據(jù)用戶的數(shù)據(jù)輸出個(gè)性化推薦。

示例3:用于分析的數(shù)據(jù)處理管道

現(xiàn)在,我們假定有一家公司需要處理大型數(shù)據(jù)集以生成涉及數(shù)據(jù)清理、轉(zhuǎn)換、分析和可視化的分析報(bào)告。

使用的RunnableRunnableSequence、RunnableEach、RunnableRetry、RunnableBinding,關(guān)鍵代碼如下:

import {
 RunnableSequence,
 RunnableEach,
 RunnableLambda,
} from "@langchain/core/runnables";

// 步驟1:通過使用重試的方式定義數(shù)據(jù)獲取
const fetchData = RunnableLambda.from(async (source) => {
 // Simulate data fetching, which may fail
 if (Math.random() < 0.2) {
 throw new Error("Data fetch error");
 }
 return `Data from ${source}`;
}).withRetry({ stopAfterAttempt: 3 });

//步驟2:數(shù)據(jù)清理
const cleanData = RunnableLambda.from((data) => {
 // Perform data cleaning
 return `Cleaned ${data}`;
});

// 步驟3:數(shù)據(jù)轉(zhuǎn)換
const transformData = RunnableLambda.from((data) => {
 // Transform data
 return `Transformed ${data}`;
});

//步驟4:數(shù)據(jù)分析
const analyzeData = RunnableLambda.from((data) => {
 // Analyze data
 return `Analysis results of ${data}`;
});

//步驟5:數(shù)據(jù)可視化
const visualizeData = RunnableLambda.from((analysis) => {
 // Generate visualization
 return `Visualization of ${analysis}`;
});

//步驟6:步驟排隊(duì)
const dataProcessingSequence = RunnableSequence.from([
 cleanData,
 transformData,
 analyzeData,
 visualizeData,
]);

// 步驟7:處理多個(gè)數(shù)據(jù)源
const dataSources = ["Dataset A", "Dataset B", "Dataset C"];

const processAllData = new RunnableEach({
 bound: fetchData.pipe(dataProcessingSequence),
});

// 用法
const reports = processAllData.invoke(dataSources);
// 輸出:每個(gè)數(shù)據(jù)源的可視化結(jié)果數(shù)組

上述代碼中,此工作流處理來自不同來源的多個(gè)數(shù)據(jù)集的數(shù)據(jù)處理。首先,定義一個(gè)使用RunnableBinding綁定到特定數(shù)據(jù)源的fetchData可運(yùn)行程序。每個(gè)數(shù)據(jù)獲取操作都使用RunnableRetry包裝,通過重試最多三次來處理瞬時(shí)故障。

從每個(gè)源獲取的數(shù)據(jù)經(jīng)過RunnableSequence定義的一系列處理步驟:

  1. 數(shù)據(jù)清理:刪除或更正錯(cuò)誤數(shù)據(jù)。
  2. 數(shù)據(jù)轉(zhuǎn)換:將數(shù)據(jù)轉(zhuǎn)換為適合分析的格式。
  3. 數(shù)據(jù)分析:執(zhí)行分析計(jì)算。
  4. 數(shù)據(jù)可視化:生成針對(duì)分析結(jié)果的可視化表示。

在此,使用RunnableEach并行處理多個(gè)數(shù)據(jù)集。這個(gè)對(duì)象將相同的處理順序應(yīng)用于每個(gè)數(shù)據(jù)集。

結(jié)論

總體來看,LangChain中的Runnable架構(gòu)是構(gòu)建涉及大型語言模型LLM的復(fù)雜模塊化工作流的強(qiáng)大基礎(chǔ)。在本文中,我們探討了如何創(chuàng)建和組合各種Runnable以應(yīng)對(duì)各種挑戰(zhàn):

  • 路由和分支:利用RouterRunnable和RunnableBranch可以根據(jù)運(yùn)行時(shí)條件實(shí)現(xiàn)動(dòng)態(tài)執(zhí)行路徑。
  • 數(shù)據(jù)操作和分配:RunnableAssign、RunnablePick和RunnableMap等工具提供靈活的數(shù)據(jù)轉(zhuǎn)換功能,為后續(xù)處理步驟準(zhǔn)備輸入。
  • 序列和工作流組合:通過使用RunnableSequence、RunnableEach和RunnableParallel鏈接任務(wù),開發(fā)人員可以協(xié)調(diào)流程,無論它們是需要順序執(zhí)行還是并行處理。
  • 錯(cuò)誤處理和彈性:借助RunnableRetry和RunnableWithFallbacks,工作流可以優(yōu)雅地處理錯(cuò)誤并提供回退機(jī)制。

總之,Runnable提倡一種結(jié)構(gòu)化的方法來構(gòu)建LLM代理和鏈。最后,你計(jì)劃將LangChain集成到實(shí)際項(xiàng)目中時(shí),請(qǐng)認(rèn)真考慮Runnables如何增強(qiáng)的工作流,使其更靈活、更有彈性且更易于維護(hù)。

譯者介紹

朱先忠,51CTO社區(qū)編輯,51CTO專家博客、講師,濰坊一所高校計(jì)算機(jī)教師,自由編程界老兵一枚。

原文標(biāo)題:Guide to LangChain Runnable Architecture,作者:Pavlo Sobchuk

責(zé)任編輯:華軒 來源: 51CTO
相關(guān)推薦

2024-08-12 15:23:43

LangChain

2023-09-20 08:00:00

大語言模型代碼庫

2023-09-28 08:41:11

OpenAILLMLangChain

2024-03-07 09:15:57

2024-05-22 09:38:25

2025-04-01 08:38:25

模型上下文協(xié)議MCPLLM

2009-09-14 16:12:57

LINQ刪除記錄

2010-03-18 14:02:59

Java Runnab

2024-07-12 14:07:04

2022-05-17 11:06:44

數(shù)據(jù)庫MySQL系統(tǒng)

2009-01-11 10:30:00

負(fù)載均衡網(wǎng)絡(luò)故障

2009-07-21 16:49:41

整合iBatis和SpSqlMapClien

2009-09-09 14:04:18

C# XML解析XML解析方法

2010-09-27 17:07:54

2024-07-17 14:16:40

XMLPythonWeb開發(fā)

2023-02-23 08:00:00

Python機(jī)器學(xué)習(xí)編程代碼

2015-09-23 14:14:47

LinkedIn架構(gòu)解析

2011-09-14 10:09:02

負(fù)載均衡網(wǎng)站架構(gòu)緩存

2017-02-24 17:24:16

Etcd架構(gòu)分布式系統(tǒng)

2010-03-16 18:59:15

Java Runnab
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)