自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

依賴Kafka的Go單元測試?yán)?/h1>

開發(fā) 架構(gòu)
對于Kafka這樣的復(fù)雜系統(tǒng)來說,找到合適的fake object并不容易。因此,本文推薦使用容器作為fake object的策略,并分別介紹了使用testcontainers-go項目和使用docker-compose作為簡化創(chuàng)建和清理基于容器的依賴項的工具。

Kafka[1]是Apache基金會開源的一個分布式事件流處理平臺,是Java陣營(最初為Scala)中的一款殺手級應(yīng)用,其提供的高可靠性、高吞吐量和低延遲的數(shù)據(jù)傳輸能力,讓其到目前為止依舊是現(xiàn)代企業(yè)級應(yīng)用系統(tǒng)以及云原生應(yīng)用系統(tǒng)中使用的重要中間件。

在日常開發(fā)Go程序時,我們經(jīng)常會遇到一些依賴Kafka的代碼[2],如何對這些代碼進(jìn)行測試,尤其是單測是擺在Go開發(fā)者前面的一個現(xiàn)實問題!

有人說用mock,是個路子。但看過我的《單測時盡量用fake object[3]》一文的童鞋估計已經(jīng)走在了尋找kafka fake object的路上了!Kafka雖好,但身形碩大,不那么靈巧。找到一個合適的fake object不容易。在這篇文章中,我們就來聊聊如何測試那些依賴kafka的代碼,再往本質(zhì)一點說,就是和大家以找找那些合適的kafka fake object。

1. 尋找fake object的策略

在《單測時盡量用fake object[4]》一文中,我們提到過,如果測試的依賴提供了tiny版本或某些簡化版,我們可以直接使用這些版本作為fake object的候選,就像etcd提供了用于測試的自身簡化版的實現(xiàn)(embed)[5]那樣。

但Kafka并沒有提供tiny版本,我們也只能選擇《單測時盡量用fake object[6]》一文提到的另外一個策略,那就是利用容器來充當(dāng)fake object,這是目前能搞到任意依賴的fake object的最簡單路徑了。也許以后WASI(WebAssembly System Interface)[7]成熟了,讓wasm脫離瀏覽器并可以在本地系統(tǒng)上飛起,到時候換用wasm也不遲。

下面我們就按照使用容器的策略來找一找適合的kafka container。

2. testcontainers-go

我們第一站就來到了testcontainers-go[8]。testcontainers-go是一個Go語言開源項目,專門用于簡化創(chuàng)建和清理基于容器的依賴項,常用于Go項目的單元測試、自動化集成或冒煙測試中。通過testcontainers-go提供的易于使用的API,開發(fā)人員能夠以編程方式定義作為測試的一部分而運行的容器,并在測試完成時清理這些資源。

注:testcontainers[9]不僅提供Go API,它還覆蓋了主流的編程語言,包括:Java、.NET、Python、Node.js、Rust[10]等。

在幾個月之前,testcontainers-go[11]項目還沒有提供對Kafka的直接支持,我們需要自己使用testcontainers.GenericContainer來自定義并啟動kafka容器。2023年9月,以KRaft模式運行的Kafka容器才被首次引入testcontainers-go項目[12]。

目前testcontainers-go使用的kafka鏡像版本是confluentinc/confluent-local:7.5.0[13]。Confluent[14]是在kafka背后的那家公司,基于kafka提供商業(yè)化支持。今年初,Confluent還收購了Immerok,將apache的另外一個明星項目Flink招致麾下。

confluent-local[15]并不是一個流行的kafka鏡像,它只是一個使用KRaft模式的零配置的、包含Confluent Community RestProxy的Apache Kafka,并且鏡像是實驗性的,僅應(yīng)用于本地開發(fā)工作流,不應(yīng)該用在支持生產(chǎn)工作負(fù)載。

生產(chǎn)中最常用的開源kafka鏡像是confluentinc/cp-kafka鏡像[16],它是基于開源Kafka項目構(gòu)建的,但在此基礎(chǔ)上添加了一些額外的功能和工具,以提供更豐富的功能和更易于部署和管理的體驗。cp-kafka鏡像的版本號并非kafka的版本號,其對應(yīng)關(guān)系需要cp-kafka鏡像官網(wǎng)查詢。

另外一個開發(fā)領(lǐng)域常用的kafka鏡像是bitnami的kafka鏡像。Bitnami是一個提供各種開源軟件的預(yù)打包鏡像和應(yīng)用程序棧的公司。Bitnami Kafka鏡像是基于開源Kafka項目構(gòu)建的,是一個可用于快速部署和運行Kafka的Docker鏡像。Bitnami Kafka鏡像與其內(nèi)部的Kakfa的版本號保持一致。

下面我們就來看看如何使用testcontainers-go的kafka來作為依賴kafka的Go單元測試用例的fake object。

這第一個測試示例改編自testcontainers-go/kafka module的example_test.go:

// testcontainers/kafka_setup/kafka_test.go

package main

import (
    "context"
    "fmt"
    "testing"

 "github.com/testcontainers/testcontainers-go/modules/kafka"
)

func TestKafkaSetup(t *testing.T) {
    ctx := context.Background()

    kafkaContainer, err := kafka.RunContainer(ctx, kafka.WithClusterID("test-cluster"))
    if err != nil {
        panic(err)
    }

    // Clean up the container
    defer func() {
        if err := kafkaContainer.Terminate(ctx); err != nil {
            panic(err)
        }
    }()

    state, err := kafkaContainer.State(ctx)
    if err != nil {
        panic(err)
    }

    if kafkaContainer.ClusterID != "test-cluster" {
        t.Errorf("want test-cluster, actual %s", kafkaContainer.ClusterID)
    }
    if state.Running != true {
        t.Errorf("want true, actual %t", state.Running)
    }
    brokers, _ := kafkaContainer.Brokers(ctx)
    fmt.Printf("%q\n", brokers)
}

在這個例子中,我們直接調(diào)用kafka.RunContainer創(chuàng)建了一個名為test-cluster的kafka實例,如果沒有通過WithImage向RunContainer傳入自定義鏡像,那么默認(rèn)我們將啟動一個confluentinc/confluent-local:7.5.0的容器(注意:隨著時間變化,該默認(rèn)容器鏡像的版本也會隨之改變)。

通過RunContainer返回的kafka.KafkaContainer我們可以獲取到關(guān)于kafka容器的各種信息,比如上述代碼中的ClusterID、kafka Broker地址信息等。有了這些信息,我們后續(xù)便可以與以容器形式啟動的kafka建立連接并做數(shù)據(jù)的寫入和讀取操作了。

我們先來看這個測試的運行結(jié)果,與預(yù)期一致:

$ go test 
2023/12/16 21:45:52 github.com/testcontainers/testcontainers-go - Connected to docker: 
  ... ...
  Resolved Docker Host: unix:///var/run/docker.sock
  Resolved Docker Socket Path: /var/run/docker.sock
  Test SessionID: 19e47867b733f4da4f430d78961771ae3a1cc66c5deca083b4f6359c6d4b2468
  Test ProcessID: 41b9ef62-2617-4189-b23a-1bfa4c06dfec
2023/12/16 21:45:52 Creating container for image docker.io/testcontainers/ryuk:0.5.1
2023/12/16 21:45:53 Container created: 8f2240042c27
2023/12/16 21:45:53 Starting container: 8f2240042c27
2023/12/16 21:45:53 Container started: 8f2240042c27
2023/12/16 21:45:53 Waiting for container id 8f2240042c27 image: docker.io/testcontainers/ryuk:0.5.1. Waiting for: &{Port:8080/tcp timeout:<nil> PollInterval:100ms}
2023/12/16 21:45:53 Creating container for image confluentinc/confluent-local:7.5.0
2023/12/16 21:45:53 Container created: a39a495aed0b
2023/12/16 21:45:53 Starting container: a39a495aed0b
2023/12/16 21:45:53 Container started: a39a495aed0b
["localhost:1037"]
2023/12/16 21:45:58 Terminating container: a39a495aed0b
2023/12/16 21:45:58 Container terminated: a39a495aed0b
PASS
ok   demo 6.236s

接下來,在上面用例的基礎(chǔ)上,我們再來做一個Kafka連接以及數(shù)據(jù)讀寫測試:

// testcontainers/kafka_consumer_and_producer/kafka_test.go

package main

import (
 "bytes"
 "context"
 "errors"
 "net"
 "strconv"
 "testing"
 "time"

 "github.com/testcontainers/testcontainers-go/modules/kafka"

 kc "github.com/segmentio/kafka-go" // kafka client
)

func createTopics(brokers []string, topics ...string) error {
 // to create topics when auto.create.topics.enable='false'
 conn, err := kc.Dial("tcp", brokers[0])
 if err != nil {
  return err
 }
 defer conn.Close()

 controller, err := conn.Controller()
 if err != nil {
  return err
 }
 var controllerConn *kc.Conn
 controllerConn, err = kc.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
 if err != nil {
  return err
 }
 defer controllerConn.Close()

 var topicConfigs []kc.TopicConfig
 for _, topic := range topics {
  topicConfig := kc.TopicConfig{
   Topic:             topic,
   NumPartitions:     1,
   ReplicationFactor: 1,
  }
  topicConfigs = append(topicConfigs, topicConfig)
 }

 err = controllerConn.CreateTopics(topicConfigs...)
 if err != nil {
  return err
 }

 return nil
}

func newWriter(brokers []string, topic string) *kc.Writer {
 return &kc.Writer{
  Addr:                   kc.TCP(brokers...),
  Topic:                  topic,
  Balancer:               &kc.LeastBytes{},
  AllowAutoTopicCreation: true,
  RequiredAcks:           0,
 }
}

func newReader(brokers []string, topic string) *kc.Reader {
 return kc.NewReader(kc.ReaderConfig{
  Brokers:  brokers,
  Topic:    topic,
  GroupID:  "test-group",
  MaxBytes: 10e6, // 10MB
 })
}

func TestProducerAndConsumer(t *testing.T) {
 ctx := context.Background()

 kafkaContainer, err := kafka.RunContainer(ctx, kafka.WithClusterID("test-cluster"))
 if err != nil {
  t.Fatalf("want nil, actual %v\n", err)
 }

 // Clean up the container
 defer func() {
  if err := kafkaContainer.Terminate(ctx); err != nil {
   t.Fatalf("want nil, actual %v\n", err)
  }
 }()

 state, err := kafkaContainer.State(ctx)
 if err != nil {
  t.Fatalf("want nil, actual %v\n", err)
 }

 if state.Running != true {
  t.Errorf("want true, actual %t", state.Running)
 }

 brokers, err := kafkaContainer.Brokers(ctx)
 if err != nil {
  t.Fatalf("want nil, actual %v\n", err)
 }

 topic := "test-topic"
 w := newWriter(brokers, topic)
 defer w.Close()
 r := newReader(brokers, topic)
 defer r.Close()

 err = createTopics(brokers, topic)
 if err != nil {
  t.Fatalf("want nil, actual %v\n", err)
 }
 time.Sleep(5 * time.Second)

 messages := []kc.Message{
  {
   Key:   []byte("Key-A"),
   Value: []byte("Value-A"),
  },
  {
   Key:   []byte("Key-B"),
   Value: []byte("Value-B"),
  },
  {
   Key:   []byte("Key-C"),
   Value: []byte("Value-C"),
  },
  {
   Key:   []byte("Key-D"),
   Value: []byte("Value-D!"),
  },
 }

 const retries = 3
 for i := 0; i < retries; i++ {
  ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  defer cancel()

  // attempt to create topic prior to publishing the message
  err = w.WriteMessages(ctx, messages...)
  if errors.Is(err, kc.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
   time.Sleep(time.Millisecond * 250)
   continue
  }

  if err != nil {
   t.Fatalf("want nil, actual %v\n", err)
  }
  break
 }

 var getMessages []kc.Message
 for i := 0; i < len(messages); i++ {
  m, err := r.ReadMessage(context.Background())
  if err != nil {
   t.Fatalf("want nil, actual %v\n", err)
  }
  getMessages = append(getMessages, m)
 }

 for i := 0; i < len(messages); i++ {
  if !bytes.Equal(getMessages[i].Key, messages[i].Key) {
   t.Errorf("want %s, actual %s\n", string(messages[i].Key), string(getMessages[i].Key))
  }
  if !bytes.Equal(getMessages[i].Value, messages[i].Value) {
   t.Errorf("want %s, actual %s\n", string(messages[i].Value), string(getMessages[i].Value))
  }
 }
}

我們使用segmentio/kafka-go這個客戶端[17]來實現(xiàn)kafka的讀寫。關(guān)于如何使用segmentio/kafka-go這個客戶端,可以參考我之前寫的《Go社區(qū)主流Kafka客戶端簡要對比[18]》。

這里我們在TestProducerAndConsumer這個用例中,先通過testcontainers-go的kafka.RunContainer啟動一個Kakfa實例,然后創(chuàng)建了一個topic: “test-topic”。我們在寫入消息前也可以不單獨創(chuàng)建這個“test-topic”,Kafka默認(rèn)啟用topic自動創(chuàng)建,并且segmentio/kafka-go的高級API:Writer也支持AllowAutoTopicCreation的設(shè)置。不過topic的創(chuàng)建需要一些時間,如果要在首次寫入消息時創(chuàng)建topic,此次寫入可能會失敗,需要retry。

向topic寫入一條消息(實際上是一個批量Message,包括四個key-value pair)后,我們調(diào)用ReadMessage從上述topic中讀取消息,并將讀取的消息與寫入的消息做比較。

注:近期發(fā)現(xiàn)kafka-go的一個可能導(dǎo)致內(nèi)存暴漲的問題[19],在kafka ack返回延遲變大的時候,可能觸發(fā)該問題。

下面是執(zhí)行該用例的輸出結(jié)果:

$ go test
2023/12/17 17:43:54 github.com/testcontainers/testcontainers-go - Connected to docker: 
  Server Version: 24.0.7
  API Version: 1.43
  Operating System: CentOS Linux 7 (Core)
  Total Memory: 30984 MB
  Resolved Docker Host: unix:///var/run/docker.sock
  Resolved Docker Socket Path: /var/run/docker.sock
  Test SessionID: f76fe611c753aa4ef1456285503b0935a29795e7c0fab2ea2588029929215a08
  Test ProcessID: 27f531ee-9b5f-4e4f-b5f0-468143871004
2023/12/17 17:43:54 Creating container for image docker.io/testcontainers/ryuk:0.5.1
2023/12/17 17:43:54 Container created: 577309098f4c
2023/12/17 17:43:54 Starting container: 577309098f4c
2023/12/17 17:43:54 Container started: 577309098f4c
2023/12/17 17:43:54 Waiting for container id 577309098f4c image: docker.io/testcontainers/ryuk:0.5.1. Waiting for: &{Port:8080/tcp timeout:<nil> PollInterval:100ms}
2023/12/17 17:43:54 Creating container for image confluentinc/confluent-local:7.5.0
2023/12/17 17:43:55 Container created: 1ee11e11742b
2023/12/17 17:43:55 Starting container: 1ee11e11742b
2023/12/17 17:43:55 Container started: 1ee11e11742b
2023/12/17 17:44:15 Terminating container: 1ee11e11742b
2023/12/17 17:44:15 Container terminated: 1ee11e11742b
PASS
ok   demo 21.505s

我們看到默認(rèn)情況下,testcontainer能滿足與kafka交互的基本需求,并且testcontainer提供了一系列Option(WithXXX)可以對container進(jìn)行定制,以滿足一些擴(kuò)展性的要求,但是這需要你對testcontainer提供的API有更全面的了解。

除了開箱即用的testcontainer之外,我們還可以使用另外一種方便的基于容器的技術(shù):docker-compose來定制和啟停我們需要的kafka image[20]。接下來,我們就來看看如何使用docker-compose建立fake kafka object。

3. 使用docker-compose建立fake kafka

3.1 一個基礎(chǔ)的基于docker-compose的fake kafka實例模板

這次我們使用bitnami提供的kafka鏡像,我們先建立一個“等價”于上面“testcontainers-go”提供的kafka module的kafka實例,下面是docker-compose.yml:

// docker-compose/bitnami/plaintext/docker-compose.yml

version: "2"

services:
  kafka:
    image: docker.io/bitnami/kafka:3.6
    network_mode: "host"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      # borrow from testcontainer
      - KAFKA_CFG_BROKER_ID=0
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_CFG_OFFSETS_TOPIC_NUM_PARTITIONS=1
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1
      - KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS=0
      - KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807
volumes:
  kafka_data:
    driver: local

我們看到其中一些配置“借鑒”了testcontainers-go的kafka module,我們啟動一下該容器:

$ docker-compose up -d
[+] Running 2/2
 ? Volume "plaintext_kafka_data"  Created                                                                                    0.0s 
 ? Container plaintext-kafka-1    Started                                                                                    0.1s

依賴該容器的go測試代碼與前面的TestProducerAndConsumer差不多,只是在開始處去掉了container的創(chuàng)建過程:

// docker-compose/bitnami/plaintext/kafka_test.go

func TestProducerAndConsumer(t *testing.T) {
 brokers := []string{"localhost:9092"}
 topic := "test-topic"
 w := newWriter(brokers, topic)
 defer w.Close()
 r := newReader(brokers, topic)
 defer r.Close()

 err := createTopics(brokers, topic)
 if err != nil {
  t.Fatalf("want nil, actual %v\n", err)
 }
 time.Sleep(5 * time.Second)
 ... ...
}

運行該測試用例,我們看到預(yù)期的結(jié)果:

go test
write message ok  Value-A
write message ok  Value-B
write message ok  Value-C
write message ok  Value-D!
PASS
ok   demo 15.143s

不過對于單元測試來說,顯然我們不能手動來啟動和停止kafka container,我們需要為每個用例填上setup和teardown,這樣也能保證用例間的相互隔離,于是我們增加了一個docker_compose_helper.go文件,在這個文件中我們提供了一些幫助testcase啟停kafka的helper函數(shù):

// docker-compose/bitnami/plaintext/docker_compose_helper.go

package main

import (
 "fmt"
 "os/exec"
 "strings"
 "time"
)

// helpler function for operating docker container through docker-compose command

const (
 defaultCmd     = "docker-compose"
 defaultCfgFile = "docker-compose.yml"
)

func execCliCommand(cmd string, opts ...string) ([]byte, error) {
 cmds := cmd + " " + strings.Join(opts, " ")
 fmt.Println("exec command:", cmds)
 return exec.Command(cmd, opts...).CombinedOutput()
}

func execDockerComposeCommand(cmd string, cfgFile string, opts ...string) ([]byte, error) {
 var allOpts = []string{"-f", cfgFile}
 allOpts = append(allOpts, opts...)
 return execCliCommand(cmd, allOpts...)
}

func UpKakfa(composeCfgFile string) ([]byte, error) {
 b, err := execDockerComposeCommand(defaultCmd, composeCfgFile, "up", "-d")
 if err != nil {
  return nil, err
 }
 time.Sleep(10 * time.Second)
 return b, nil
}

func UpDefaultKakfa() ([]byte, error) {
 return UpKakfa(defaultCfgFile)
}

func DownKakfa(composeCfgFile string) ([]byte, error) {
    b, err := execDockerComposeCommand(defaultCmd, composeCfgFile, "down", "-v")
    if err != nil { 
        return nil, err
    }
    time.Sleep(10 * time.Second)
    return b, nil
}

func DownDefaultKakfa() ([]byte, error) {
 return DownKakfa(defaultCfgFile)
}

眼尖的童鞋可能看到:在UpKakfa和DownKafka函數(shù)中我們使用了硬編碼的“time.Sleep”來等待10s,通常在鏡像已經(jīng)pull到本地后這是有效的,但卻不是最精確地等待方式,testcontainers-go/wait[21]中提供了等待容器內(nèi)程序啟動完畢的多種策略,如果你想用更精確的等待方式,可以了解一下wait包。

基于helper函數(shù),我們改造一下TestProducerAndConsumer用例:

// docker-compose/bitnami/plaintext/kafka_test.go
func TestProducerAndConsumer(t *testing.T) {
    _, err := UpDefaultKakfa()
    if err != nil {
        t.Fatalf("want nil, actual %v\n", err)
    }

    t.Cleanup(func() {
        DownDefaultKakfa()
    })
 ... ...
}

我們在用例開始處通過UpDefaultKakfa使用docker-compose將kafka實例啟動起來,然后注冊了Cleanup函數(shù)[22],用于在test case執(zhí)行結(jié)束后銷毀kafka實例。

下面是新版用例的執(zhí)行結(jié)果:

$ go test
exec command: docker-compose -f docker-compose.yml up -d
write message ok  Value-A
write message ok  Value-B
write message ok  Value-C
write message ok  Value-D!
exec command: docker-compose -f docker-compose.yml down -v
PASS
ok   demo 36.402s

使用docker-compose的最大好處就是可以通過docker-compose.yml文件對要fake的object進(jìn)行靈活的定制,這種定制與testcontainers-go的差別就是你無需去研究testcontiners-go的API。

下面是使用tls連接與kafka建立連接并實現(xiàn)讀寫的示例。

3.2 建立一個基于TLS連接的fake kafka實例

Kafka的配置復(fù)雜是有目共睹的,為了建立一個基于TLS連接,我也是花了不少時間做“試驗”,尤其是listeners以及證書的配置,不下點苦功夫讀文檔還真是配不出來。

下面是一個基于bitnami/kafka鏡像配置出來的基于TLS安全通道上的kafka實例:

// docker-compose/bitnami/tls/docker-compose.yml

# config doc:  https://github.com/bitnami/containers/blob/main/bitnami/kafka/README.md

version: "2"

services:
  kafka:
    image: docker.io/bitnami/kafka:3.6
    network_mode: "host"
    #ports:
      #- "9092:9092"
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9094
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,SECURED://:9093,CONTROLLER://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=SECURED://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,SECURED:SSL,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SECURED
      # SSL settings
      - KAFKA_TLS_TYPE=PEM
      - KAFKA_TLS_CLIENT_AUTH=none
      - KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
      # borrow from testcontainer
      - KAFKA_CFG_BROKER_ID=0
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_CFG_OFFSETS_TOPIC_NUM_PARTITIONS=1
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1
      - KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS=0
      - KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807
    volumes:
      # server.cert, server.key and ca.crt
      - "kafka_data:/bitnami"
      - "./kafka.keystore.pem:/opt/bitnami/kafka/config/certs/kafka.keystore.pem:ro"
      - "./kafka.keystore.key:/opt/bitnami/kafka/config/certs/kafka.keystore.key:ro"
      - "./kafka.truststore.pem:/opt/bitnami/kafka/config/certs/kafka.truststore.pem:ro"
volumes:
  kafka_data:
    driver: local

這里我們使用pem格式的證書和key,在上面配置中,volumes下面掛載的kafka.keystore.pem、kafka.keystore.key和kafka.truststore.pem分別對應(yīng)了以前在Go中常用的名字:server-cert.pem(服務(wù)端證書), server-key.pem(服務(wù)端私鑰)和ca-cert.pem(CA證書)。

這里整理了一個一鍵生成的腳本docker-compose/bitnami/tls/kafka-generate-cert.sh,我們執(zhí)行該腳本生成所有需要的證書并放到指定位置(遇到命令行提示,只需要一路回車即可):

$bash kafka-generate-cert.sh 
.........++++++
.............................++++++
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [XX]:
State or Province Name (full name) []:
Locality Name (eg, city) [Default City]:
Organization Name (eg, company) [Default Company Ltd]:
Organizational Unit Name (eg, section) []:
Common Name (eg, your name or your server's hostname) []:
Email Address []:

Please enter the following 'extra' attributes
to be sent with your certificate request
A challenge password []:
An optional company name []:
Signature ok
subject=/C=XX/L=Default City/O=Default Company Ltd
Getting Private key
.....................++++++
.........++++++
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [XX]:
State or Province Name (full name) []:
Locality Name (eg, city) [Default City]:
Organization Name (eg, company) [Default Company Ltd]:
Organizational Unit Name (eg, section) []:
Common Name (eg, your name or your server's hostname) []:
Email Address []:

Please enter the following 'extra' attributes
to be sent with your certificate request
A challenge password []:
An optional company name []:
Signature ok
subject=/C=XX/L=Default City/O=Default Company Ltd
Getting CA Private Key

接下來,我們來改造用例,使之支持以tls方式建立到kakfa的連接:

//docker-compose/bitnami/tls/kafka_test.go

func createTopics(brokers []string, tlsConfig *tls.Config, topics ...string) error {
    dialer := &kc.Dialer{
        Timeout:   10 * time.Second,
        DualStack: true,
        TLS:       tlsConfig,
    }

    conn, err := dialer.DialContext(context.Background(), "tcp", brokers[0])
    if err != nil {
        fmt.Println("creating topic: dialer dial error:", err)
        return err
    }
    defer conn.Close()
    fmt.Println("creating topic: dialer dial ok")
 ... ...
}

func newWriter(brokers []string, tlsConfig *tls.Config, topic string) *kc.Writer {
    w := &kc.Writer{
        Addr:                   kc.TCP(brokers...),
        Topic:                  topic,
        Balancer:               &kc.LeastBytes{},
        AllowAutoTopicCreation: true,
        Async:                  true,
        //RequiredAcks:           0,
        Completion: func(messages []kc.Message, err error) {
            for _, message := range messages {
                if err != nil {
                    fmt.Println("write message fail", err)
                } else {
                    fmt.Println("write message ok", string(message.Topic), string(message.Value))
                }
            }
        },
    }

    if tlsConfig != nil {
        w.Transport = &kc.Transport{
            TLS: tlsConfig,
        }
    }
    return w
}

func newReader(brokers []string, tlsConfig *tls.Config, topic string) *kc.Reader {
    dialer := &kc.Dialer{
        Timeout:   10 * time.Second,
        DualStack: true,
        TLS:       tlsConfig,
    }

    return kc.NewReader(kc.ReaderConfig{
        Dialer:   dialer,
        Brokers:  brokers,
        Topic:    topic,
        GroupID:  "test-group",
        MaxBytes: 10e6, // 10MB
    })
}

func TestProducerAndConsumer(t *testing.T) {
    var err error
    _, err = UpDefaultKakfa()
    if err != nil {
        t.Fatalf("want nil, actual %v\n", err)
    }

    t.Cleanup(func() {
        DownDefaultKakfa()
    })

    brokers := []string{"localhost:9093"}
    topic := "test-topic"

    tlsConfig, _ := newTLSConfig()
    w := newWriter(brokers, tlsConfig, topic)
    defer w.Close()
    r := newReader(brokers, tlsConfig, topic)
    defer r.Close()
    err = createTopics(brokers, tlsConfig, topic)
    if err != nil {
        fmt.Printf("create topic error: %v, but it may not affect the later action, just ignore it\n", err)
    }
    time.Sleep(5 * time.Second)
 ... ...
}

func newTLSConfig() (*tls.Config, error) {
    /*
       // 加載 CA 證書
       caCert, err := ioutil.ReadFile("/path/to/ca.crt")
       if err != nil {
               return nil, err
       }

       // 加載客戶端證書和私鑰
       cert, err := tls.LoadX509KeyPair("/path/to/client.crt", "/path/to/client.key")
       if err != nil {
               return nil, err
       }

       // 創(chuàng)建 CertPool 并添加 CA 證書
       caCertPool := x509.NewCertPool()
       caCertPool.AppendCertsFromPEM(caCert)
    */
    // 創(chuàng)建并返回 TLS 配置
    return &tls.Config{
        //RootCAs:      caCertPool,
        //Certificates: []tls.Certificate{cert},
        InsecureSkipVerify: true,
    }, nil
}

在上述代碼中,我們按照segmentio/kafka-go為createTopics、newWriter和newReader都加上了tls.Config參數(shù),此外在測試用例中,我們用newTLSConfig創(chuàng)建一個tls.Config的實例,在這里我們一切簡化處理,采用InsecureSkipVerify=true的方式與kafka broker服務(wù)端進(jìn)行握手,既不驗證服務(wù)端證書,也不做雙向認(rèn)證(mutual TLS)。

下面是修改代碼后的測試用例執(zhí)行結(jié)果:

$ go test
exec command: docker-compose -f docker-compose.yml up -d
creating topic: dialer dial ok
creating topic: get controller ok
creating topic: dial control listener ok
create topic error: EOF, but it may not affect the later action, just ignore it
write message error: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
write message ok  Value-A
write message ok  Value-B
write message ok  Value-C
write message ok  Value-D!
exec command: docker-compose -f docker-compose.yml down -v
PASS
ok   demo 38.473s

這里我們看到:createTopics雖然連接kafka的各個listener都o(jì)k,但調(diào)用topic創(chuàng)建時,返回EOF,但這的確不影響后續(xù)action的執(zhí)行,不確定這是segmentio/kafka-go的問題,還是kafka實例的問題。另外首次寫入消息時,也因為topic或partition未建立而失敗,retry后消息正常寫入。

通過這個例子我們看到,基于docker-compose建立fake object有著更廣泛的靈活性,如果做好容器啟動和停止的精準(zhǔn)wait機制的話,我可能會更多選擇這種方式。

4. 小結(jié)

本文介紹了如何在Go編程中進(jìn)行依賴Kafka的單元測試,并探討了尋找適合的Kafka fake object的策略。

對于Kafka這樣的復(fù)雜系統(tǒng)來說,找到合適的fake object并不容易。因此,本文推薦使用容器作為fake object的策略,并分別介紹了使用testcontainers-go項目和使用docker-compose作為簡化創(chuàng)建和清理基于容器的依賴項的工具。相對于剛剛加入testcontainers-go項目沒多久的kafka module而言,使用docker-compose自定義fake object更加靈活一些。但無論哪種方法,開發(fā)人員都需要對kafka的配置有一個較為整體和深入的理解。

文中主要聚焦使用testcontainers-go和docker-compose建立fake kafka的過程,而用例并沒有建立明確的sut(被測目標(biāo)),比如針對某個函數(shù)的白盒單元測試。

文本涉及的源碼可以在這里[23]下載。

參考資料

[1] Kafka: https://kafka.apache.org

[2] 依賴Kafka的代碼: https://tonybai.com/2023/09/04/slog-in-action-file-logging-rotation-and-kafka-integration/

[3] 單測時盡量用fake object: https://tonybai.com/2023/04/20/provide-fake-object-for-external-collaborators/

[4] 單測時盡量用fake object: https://tonybai.com/2023/04/20/provide-fake-object-for-external-collaborators/

[5] 用于測試的自身簡化版的實現(xiàn)(embed): https://github.com/etcd-io/etcd/blob/main/tests/integration/embed

[6] 單測時盡量用fake object: https://tonybai.com/2023/04/20/provide-fake-object-for-external-collaborators/

[7] WASI(WebAssembly System Interface): https://wasi.dev/

[8] testcontainers-go: https://golang.testcontainers.org/

[9] testcontainers: https://testcontainers.com

[10] Rust: https://tonybai.com/2023/02/22/rust-vs-go-in-2023/

[11] testcontainers-go: https://github.com/testcontainers/testcontainers-go/

[12] 以KRaft模式運行的Kafka容器才被首次引入testcontainers-go項目: https://github.com/testcontainers/testcontainers-go/pull/1610

[13] confluentinc/confluent-local:7.5.0: https://hub.docker.com/r/confluentinc/confluent-local

[14] Confluent: https://www.confluent.io

[15] confluent-local: https://hub.docker.com/r/confluentinc/confluent-local

[16] confluentinc/cp-kafka鏡像: https://hub.docker.com/r/confluentinc/cp-kafka

[17] 使用segmentio/kafka-go這個客戶端: https://tonybai.com/2022/03/28/the-comparison-of-the-go-community-leading-kakfa-clients

[18] Go社區(qū)主流Kafka客戶端簡要對比: https://tonybai.com/2022/03/28/the-comparison-of-the-go-community-leading-kakfa-clients

[19] 發(fā)現(xiàn)kafka-go的一個可能導(dǎo)致內(nèi)存暴漲的問題: https://github.com/segmentio/kafka-go/pull/1117

[20] docker-compose來定制和啟停我們需要的kafka image: https://tonybai.com/2021/11/26/build-all-in-one-runtime-environment-with-docker-compose

[21] testcontainers-go/wait: https://pkg.go.dev/github.com/testcontainers/testcontainers-go@v0.26.0/wait

[22] 注冊了Cleanup函數(shù): https://tonybai.com/2020/03/08/some-changes-in-go-1-14/

[23] 這里: https://github.com/bigwhite/experiments/tree/master/unit-testing-deps-on-kafka

[24] Gopher部落知識星球: https://public.zsxq.com/groups/51284458844544

[25] 鏈接地址: https://m.do.co/c/bff6eed92687

責(zé)任編輯:武曉燕 來源: TonyBai
相關(guān)推薦

2022-04-08 09:01:56

腳本Go應(yīng)用單元

2017-01-14 23:42:49

單元測試框架軟件測試

2023-10-28 10:10:41

2023-07-26 08:58:45

Golang單元測試

2011-05-16 16:52:09

單元測試徹底測試

2021-06-15 08:08:47

Java單元測試

2017-01-14 23:26:17

單元測試JUnit測試

2017-01-16 12:12:29

單元測試JUnit

2011-06-14 15:56:42

單元測試

2020-08-18 08:10:02

單元測試Java

2022-05-12 09:37:03

測試JUnit開發(fā)

2017-03-23 16:02:10

Mock技術(shù)單元測試

2024-10-16 16:09:32

2021-05-05 11:38:40

TestNGPowerMock單元測試

2011-07-04 18:16:42

單元測試

2020-05-07 17:30:49

開發(fā)iOS技術(shù)

2023-11-25 09:41:34

GogRPCHandler

2011-06-14 15:39:46

單元測試

2012-05-21 09:41:54

XcodeiOS單元測試

2011-04-18 13:20:40

單元測試軟件測試
點贊
收藏

51CTO技術(shù)棧公眾號