自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Golang 語言中 kafka 客戶端庫 Sarama

開發(fā) 前端 Kafka
Apache Kafka 是一款開源的消息引擎系統(tǒng)。它在項(xiàng)目中的作用主要是削峰填谷和解耦。本文我們只介紹 Apache Kafka 的 Golang 客戶端庫 Sarama。Sarama 是 MIT 許可的 Apache Kafka 0.8 及更高版本的 Golang 客戶端庫。

01、介紹

Apache Kafka 是一款開源的消息引擎系統(tǒng)。它在項(xiàng)目中的作用主要是削峰填谷和解耦。本文我們只介紹 Apache Kafka 的 Golang 客戶端庫 Sarama。Sarama 是 MIT 許可的 Apache Kafka 0.8 及更高版本的 Golang 客戶端庫。

如果讀者朋友對(duì) Apache Kafka 服務(wù)端還不了解,建議先閱讀官方文檔中的入門部分,本文使用的版本是 Apache Kafka 2.8。

[[397879]]

02、生產(chǎn)者

我們可以使用 Sarama 庫的 AsyncProducer 或 SyncProducer 生產(chǎn)消息。在大多數(shù)情況下首選使用 AsyncProducer 生產(chǎn)消息。它通過一個(gè) channel 接收消息,并在后臺(tái)盡可能高效的異步生產(chǎn)消息。

SyncProducer 發(fā)送 Kafka 消息后阻塞,直到接收到 ACK 確認(rèn)。SyncProducer 有兩個(gè)警告:它通常效率較低,并且實(shí)際的耐用性保證取決于 Producer.RequiredAcks 的配置值。在某些配置中,有時(shí)仍會(huì)丟失由 SyncProducer 確認(rèn)的消息,但是使用比較簡單。

為了讀者朋友們?nèi)菀桌斫?,本文我們介紹 SyncProducer 作為生產(chǎn)者的使用方式。如果讀者朋友想了解 AsyncProducer 作為生產(chǎn)者的使用方式,請參考官方文檔。

使用 SyncProducer 作為生產(chǎn)者的示例代碼:

  1. func sendMessage (brokerAddr []string, config *sarama.Config, topic string, value sarama.Encoder) { 
  2.  producer, err := sarama.NewSyncProducer(brokerAddr, config) 
  3.  if err != nil { 
  4.   fmt.Println(err) 
  5.   return 
  6.  } 
  7.  defer func() { 
  8.   if err = producer.Close(); err != nil { 
  9.    fmt.Println(err) 
  10.    return 
  11.   } 
  12.  }() 
  13.  msg := &sarama.ProducerMessage{ 
  14.   Topic: topic, 
  15.   Value: value, 
  16.  } 
  17.  partition, offset, err := producer.SendMessage(msg) 
  18.  if err != nil { 
  19.   fmt.Println(err) 
  20.   return 
  21.  } 
  22.  fmt.Printf("partition:%d offset:%d\n", partition, offset) 

閱讀上面這段代碼,我們調(diào)用 NewSyncProducer() 創(chuàng)建一個(gè)新的 SyncProducer,給定 broker 地址和配置信息。調(diào)用 SendMessage() 生產(chǎn)給定的消息,并且僅在生產(chǎn)成功或失敗時(shí)返回。它將返回分區(qū)(Partition)和生產(chǎn)的消息的偏移量(Offset),如果消息生產(chǎn)失敗,則返回錯(cuò)誤。

需要注意的是,為了避免泄露,必須在生產(chǎn)者上調(diào)用 Close(),因?yàn)楫?dāng)它超出范圍時(shí),可能不會(huì)自動(dòng)垃圾回收。

03、消費(fèi)者

我們可以使用 Sarama 庫的消費(fèi)者 Consumer 或消費(fèi)者組 ConsumerGroup API 消費(fèi)消息。為了讀者朋友們?nèi)菀桌斫?,本文我們介紹使用 Consumer 消費(fèi)消息。

Consumer 管理 PartitionConsumers,該 PartitionConsumers 處理來自 brokers 的 Kafka 消息。

Consumer 消費(fèi)消息的示例代碼:

  1. func consumer (brokenAddr []string, topic string, partition int32, offset int64) { 
  2.  consumer, err := sarama.NewConsumer(brokenAddr, nil) 
  3.  if err != nil { 
  4.   fmt.Println(err) 
  5.   return 
  6.  } 
  7.  defer func() { 
  8.   if err = consumer.Close(); err != nil { 
  9.    fmt.Println(err) 
  10.    return 
  11.   } 
  12.  }() 
  13.  partitionConsumer, err := consumer.ConsumePartition(topic, partition, offset) 
  14.  if err != nil { 
  15.   fmt.Println(err) 
  16.   return 
  17.  } 
  18.  defer func() { 
  19.   if err = partitionConsumer.Close(); err != nil { 
  20.    fmt.Println(err) 
  21.    return 
  22.   } 
  23.  }() 
  24.  for msg := range partitionConsumer.Messages() { 
  25.   fmt.Printf("partition:%d offset:%d key:%s val:%s\n", msg.Partition, msg.Offset, msg.Key, msg.Value) 
  26.  } 

閱讀上面這段代碼,我們調(diào)用 NewConsumer() 創(chuàng)建一個(gè)新的 consumer,給定 broker 地址和配置信息。調(diào)用 ConsumePartition() 創(chuàng)建 PartitionConsumer,給定 topic、partition 和 offset。PartitionConsumer 處理來自給定 topic 和 partition 的 Kafka 消息。

需要注意的是,為了防止泄露,必須調(diào)用 consumer 和 partitionConsumer 的 Close(),因?yàn)楫?dāng)它超出范圍時(shí),可能不會(huì)自動(dòng)垃圾回收。

04、總結(jié)

本文主要介紹如何使用 Apache Kafka 的 Golang 語言客戶端庫 Sarama 生產(chǎn)和消費(fèi) Kafka 消息。關(guān)于生產(chǎn)者和消費(fèi)者,分別列舉了一個(gè)簡單示例。除此之外,Sarama 庫還提供了很多其它 Api,感興趣的讀者朋友可以閱讀官方文檔了解更多。

責(zé)任編輯:未麗燕 來源: Golang語言開發(fā)棧
相關(guān)推薦

2021-10-18 05:00:38

語言GoRequestHTTP

2014-08-11 16:35:35

KafkaJava客戶端

2024-02-05 08:50:57

Golang標(biāo)準(zhǔn)庫客戶端

2022-02-20 23:15:46

gRPCGolang語言

2021-08-01 23:18:21

Redis Golang命令

2021-10-26 09:26:21

編程技能開發(fā)

2011-08-17 10:10:59

2021-09-22 15:46:29

虛擬桌面瘦客戶端胖客戶端

2022-09-23 08:02:42

Kafka消息緩存

2010-05-31 10:11:32

瘦客戶端

2011-10-26 13:17:05

2021-06-07 23:19:44

Golang語言 Defer

2021-04-28 09:02:48

Golang語言Context

2020-03-19 08:00:00

客戶端KubernetesAPI

2011-03-02 14:36:24

Filezilla客戶端

2010-12-21 11:03:15

獲取客戶端證書

2011-03-24 13:00:31

配置nagios客戶端

2011-03-21 14:53:36

Nagios監(jiān)控Linux

2009-03-04 10:27:50

客戶端組件桌面虛擬化Xendesktop

2011-04-06 14:24:20

Nagios監(jiān)控Linux
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)