自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

一起學Elasticsearch-Pipeline

數(shù)據(jù)庫 其他數(shù)據(jù)庫
Elasticsearch Pipeline作為Elasticsearch中強大而靈活的功能之一,為用戶提供了處理數(shù)據(jù)的機制,可以在數(shù)據(jù)索引之前或之后應(yīng)用多種處理步驟,例如數(shù)據(jù)預處理、轉(zhuǎn)換、清洗、分析等操作。

在現(xiàn)代的數(shù)據(jù)處理和分析場景中,數(shù)據(jù)不僅需要被存儲和檢索,還需要經(jīng)過各種復雜的轉(zhuǎn)換、處理和豐富,以滿足業(yè)務(wù)需求和提高數(shù)據(jù)價值。

Elasticsearch Pipeline作為Elasticsearch中強大而靈活的功能之一,為用戶提供了處理數(shù)據(jù)的機制,可以在數(shù)據(jù)索引之前或之后應(yīng)用多種處理步驟,例如數(shù)據(jù)預處理、轉(zhuǎn)換、清洗、分析等操作。

使用場景

Elasticsearch Pipeline 可以用于多種實際場景,其中包括但不限于:

  • 數(shù)據(jù)預處理:對原始數(shù)據(jù)進行清洗、標準化、去除噪聲等操作,保證數(shù)據(jù)質(zhì)量和一致性。
  • 數(shù)據(jù)轉(zhuǎn)換:將數(shù)據(jù)轉(zhuǎn)換為更加符合業(yè)務(wù)需求的形式,例如字段映射、格式轉(zhuǎn)換、數(shù)據(jù)合并等。
  • 日志處理:實時日志數(shù)據(jù)的解析、提取關(guān)鍵信息、計算指標、數(shù)據(jù)聚合等操作。
  • 數(shù)據(jù)安全:對敏感數(shù)據(jù)進行脫敏處理、數(shù)據(jù)屏蔽、權(quán)限控制等操作,確保數(shù)據(jù)安全性。

具體使用

要實現(xiàn)Elasticsearch Pipeline功能,需要在節(jié)點上進行以下設(shè)置:

啟用Ingest節(jié)點:確保節(jié)點上已啟用Ingest處理模塊(默認情況下,每個節(jié)點都是Ingest Node),因為Pipeline是在Ingest處理階段應(yīng)用的。可以在elasticsearch.yml配置文件中添加以下設(shè)置來啟用Ingest節(jié)點:

node.ingest: true

配置Pipeline的最大值:如果需要創(chuàng)建復雜的Pipeline或者包含大量處理步驟的Pipeline,可能需要調(diào)整默認的Pipeline容量限制??梢酝ㄟ^以下方式在elasticsearch.yml配置文件中設(shè)置Pipeline的最大值:

ingest.max_pipelines: 1000

檢查內(nèi)存和資源使用:確保節(jié)點具有足夠的內(nèi)存和資源來支持Pipeline的運行,避免因為資源不足而導致Pipeline執(zhí)行失敗或性能下降。

對上述參數(shù)進行合理的配置后,就可以定義 Pipeline,并將其應(yīng)用于索引文檔了。

下面是一個簡單的示例代碼,演示如何創(chuàng)建和使用Pipeline:

創(chuàng)建Pipeline

PUT _ingest/pipeline/my_pipeline
{
  "description" : "My custom pipeline",
  "processors" : [
    {
      "set": {
        "field": "new_field",
        "value": "example"
      }
    },
    {
      "uppercase": {
        "field": "message"
      }
    }
  ]
}

上面的代碼定義了一個名為 my_pipeline 的Pipeline,包含兩個處理步驟:

  • set 處理器:將字段 new_field 設(shè)置為固定值 example。
  • uppercase 處理器:將字段 message 中的文本轉(zhuǎn)換為大寫。

一個Elasticsearch Pipeline通常由以下幾個主要部分組成:

  • 描述(Description):Pipeline的描述部分包含對Pipeline的簡要說明或注釋,用于幫助其他人理解該Pipeline的作用和功能。
  • 處理器(Processors):Pipeline的核心是處理器,處理器定義了對文檔進行的具體處理步驟。每個處理器都執(zhí)行特定的操作,例如設(shè)置字段值、重命名字段、轉(zhuǎn)換數(shù)據(jù)、條件判斷等。處理器按照在Pipeline中的順序依次執(zhí)行,以完成對文檔的處理。
  • 條件(Conditions):可選部分,條件定義了觸發(fā)Pipeline應(yīng)用的條件。只有當條件滿足時,Pipeline才會被應(yīng)用到相應(yīng)的文檔上。條件可以基于文檔內(nèi)容、字段值、索引信息等進行判斷。
  • 內(nèi)置變量(Built-in Variables):在處理器中可以使用一些內(nèi)置變量來引用文檔數(shù)據(jù)或上下文信息,并在處理過程中進行操作。例如,_index表示當前文檔所屬的索引名稱,_ingest.timestamp表示處理器執(zhí)行的時間戳等。
  • 標簽(Tags):可選部分,為Pipeline添加標簽,用于標識和分類不同類型的Pipeline。

這些部分共同構(gòu)成了一個完整的Elasticsearch Pipeline,通過定義和配置這些部分,可以實現(xiàn)對文檔數(shù)據(jù)的靈活處理和轉(zhuǎn)換。

應(yīng)用Pipeline

一旦Pipeline被定義,可以在索引文檔時指定應(yīng)用該Pipeline:

POST my_index/_doc/1?pipeline=my_pipeline
{
  "message": "Hello, World!"
}

異常處理

在Elasticsearch Pipeline 中處理異常情況通常通過 on_failure 處理器來實現(xiàn)。下面是一個示例代碼,演示如何使用 on_failure 處理器來處理異常情況:

PUT _ingest/pipeline/my_pipeline
{
  "description": "Pipeline with error handling",
  "processors": [
    {
      "set": {
        "field": "new_field",
        "value": "{{field_with_value}}"
      }
    },
    {
      "on_failure": [
        {
          "set": {
            "field": "error_message",
            "value": "{{_ingest.on_failure_message}}"
          }
        }
      ]
    }
  ]
}

在上面的示例中,定義了一個名為 my_pipeline 的 Pipeline,其中包含兩個處理器:

  • 第一個處理器使用 set 處理器來設(shè)置一個新的字段 new_field 的值為另一個字段 field_with_value 的值。
  • 第二個處理器是一個 on_failure 處理器,在前一個處理器執(zhí)行失敗時會被觸發(fā)。這里使用 on_failure_message 變量來獲取失敗的原因,并將其設(shè)置到一個新的字段 error_message 中。

當?shù)谝粋€處理器執(zhí)行失敗時,第二個處理器會被觸發(fā),并將失敗信息存儲到 error_message 字段中,以便后續(xù)處理或記錄日志。這樣可以幫助我們更好地處理異常情況,確保數(shù)據(jù)處理的穩(wěn)定性。

如果是Pipeline級別的錯誤,可以通過全局設(shè)置on_failure來處理整個Pipeline執(zhí)行過程中的異常情況:

PUT _ingest/pipeline/my_pipeline
{
  "description": "Pipeline with global error handling",
  "on_failure": [
    {
      "set": {
        "field": "error_message",
        "value": "{{_ingest.on_failure_message}}"
      }
    }
  ],
  "processors": [
    {
      "set": {
        "field": "new_field",
        "value": "{{field_with_value}}"
      }
    }
  ]
}

在上述示例中,Pipeline my_pipeline 中定義了一個全局的on_failure處理器,在整個Pipeline執(zhí)行過程中發(fā)生異常時會觸發(fā)。當任何處理器執(zhí)行失敗時,全局on_failure處理器將被調(diào)用,并將失敗消息存儲到error_message字段中。

通過設(shè)置全局的on_failure處理器,可以統(tǒng)一處理整個Pipeline中任何處理器可能出現(xiàn)的異常情況,提高數(shù)據(jù)處理的穩(wěn)定性和可靠性。這樣即便是Pipeline級別的錯誤,也能得到有效的處理和記錄,幫助排查問題并保證數(shù)據(jù)處理流程的正常運行。

為索引設(shè)置默認Pipeline

從 Elasticsearch 6.5.x 開始,引入了一個名為 index.default_pipeline 的新索引設(shè)置。這僅僅意味著所有攝取的文檔都將由默認管道進行預處理:

PUT my_index
{
  "settings": {
    "default_pipeline": "add_last_update_time"
  }
}

內(nèi)置Processors

Elasticsearch內(nèi)置的Processors提供了各種功能,用于在Ingest Pipeline中對文檔進行處理。以下是一些常用的內(nèi)置Processors及其作用:

  • Set Processor:設(shè)置字段的固定值或通過表達式計算值。
  • Grok Processor:解析文本字段并提取結(jié)構(gòu)化數(shù)據(jù)。
  • Date Processor:解析日期字段。
  • Convert Processor:轉(zhuǎn)換字段類型。
  • Remove Processor:刪除指定字段。
  • Split Processor:根據(jù)分隔符拆分字段。
  • GeoIP Processor:根據(jù)IP地址查找地理位置信息。
  • User Agent Processor:解析User-Agent字段。

Pipeline API

以下是有關(guān)Elasticsearch Pipeline API的簡要介紹和示例代碼:

  • Put Pipeline API:用于創(chuàng)建或更新Pipeline。
PUT /_ingest/pipeline/my_pipeline
{
  "description": "My custom pipeline",
  "processors": [
    {
      "set": {
        "field": "new_field",
        "value": "default"
      }
    }
  ]
}
  • Get Pipeline API:用于獲取Pipeline的信息。
GET /_ingest/pipeline/my_pipeline
  • Delete Pipeline API:用于刪除Pipeline。
DELETE /_ingest/pipeline/my_pipeline
  • Simulate Pipeline API:用于模擬Pipeline對文檔的處理效果。
POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "set": {
          "field": "new_field",
          "value": "default"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "my_field": "my_value"
      }
    }
  ]
}
  • Manage Pipelines in Index Templates:可以在索引模板中定義Pipeline。
PUT /_index_template/my_template
{
  "index_patterns": ["my_index*"],
  "composed_of": ["my_pipeline"],
  "priority": 1
}

使用建議

在使用Elasticsearch Pipeline時,有幾點建議可以幫助提高效率和準確性:

  • 測試和驗證:在應(yīng)用Pipeline之前,務(wù)必進行充分的測試和驗證,確保處理步驟的準確性和穩(wěn)定性。
  • 監(jiān)控和調(diào)優(yōu):定期監(jiān)控Pipeline的性能和效果,根據(jù)實際情況進行調(diào)優(yōu)和優(yōu)化,以提高數(shù)據(jù)處理和索引效率。
  • 復用Pipeline:針對相似的數(shù)據(jù)處理需求,可以設(shè)計通用的Pipeline,以便在多個索引中重復使用,提高代碼復用性和維護性。
  • 合理使用條件:根據(jù)具體需求選擇合適的條件觸發(fā)Pipeline的應(yīng)用,避免不必要的處理過程,提高系統(tǒng)性能。
責任編輯:武曉燕 來源: Java隨想錄
相關(guān)推薦

2023-11-13 22:27:53

Mapping數(shù)據(jù)庫

2023-11-13 12:48:32

語言DSL

2023-11-30 15:23:07

聚合查詢數(shù)據(jù)分析

2023-12-26 12:12:57

檢索調(diào)優(yōu)Scripting場景

2022-11-29 16:35:02

Tetris鴻蒙

2022-12-02 14:20:09

Tetris鴻蒙

2022-11-14 17:01:34

游戲開發(fā)畫布功能

2023-03-30 09:32:27

2023-02-28 07:28:50

Spritepixijs

2023-04-26 07:42:16

WebGL圖元的類型

2023-05-04 08:48:42

WebGL復合矩陣

2023-06-26 15:14:19

WebGL紋理對象學習

2023-04-12 07:46:24

JavaScriptWebGL

2023-03-29 07:31:09

WebGL坐標系

2023-04-11 07:48:32

WebGLCanvas

2023-03-02 07:44:39

pixijsWebGL

2023-04-13 07:45:15

WebGL片元著色器

2023-05-31 20:10:03

WebGL繪制立方體

2023-05-16 07:44:03

紋理映射WebGL

2023-02-22 09:27:31

CanvasWebGL
點贊
收藏

51CTO技術(shù)棧公眾號