Go 語言微服務(wù)框架 Kratos 集成第三方庫 kafka-go 操作消息隊列 Kafka
1.介紹
Go 語言微服務(wù)框架 Kratos 不限制使用任何第三方庫,Go 語言操作消息隊列 Kafka 有很多優(yōu)秀的第三方庫,比如 sarama 和 kafka-go,我們在之前的文章中介紹過 Go 語言怎么使用 sarama 操作消息隊列 Kafka。
本文我們介紹 Go 微服務(wù)框架 Kratos 怎么集成第三方庫 kafka-go[1] 操作消息隊列 Kafka。
2.Kratos 集成第三方庫 kafka-go
我們在本地搭建 Go 運行環(huán)境,并安裝 kratos 工具,使用 kratos 工具創(chuàng)建項目 blog。
在 blog 項目中,集成第三方庫 kafka-go。
創(chuàng)建項目
示例代碼:
kratos new blog
安裝 kafka-go
go get github.com/segmentio/kafka-go
集成 Kafka Producer(生產(chǎn)者)和 Kafka Consumer(消費者)
編寫文件 blog/internal/data/data.go
導(dǎo)入第三方庫:
import (
"github.com/segmentio/kafka-go"
)
添加 Kafka Producer(生產(chǎn)者)和 Kafka Consumer(消費者):
// Data .
type Data struct {
// TODO wrapped database client
dbEngine *xorm.Engine
kp *kafkaProducer
kc *KafkaConsumer
}
// NewData .
func NewData(c *conf.Data, logger log.Logger, dbEngin *xorm.Engine, kp *kafkaProducer, kc *KafkaConsumer) (*Data, func(), error) {
cleanup := func() {
log.NewHelper(logger).Info("closing the data resources")
}
return &Data{
dbEngine: dbEngin,
kp: kp,
kc: kc,
}, cleanup, nil
}
Kafka Producer(生產(chǎn)者):
type kafkaProducer struct {
writer *kafka.Writer
}
func NewKafkaProducer(c *conf.Data) *kafkaProducer {
brokers := c.Kafka.Brokers
topic := c.Kafka.Topic
writer := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
}
return &kafkaProducer{writer: writer}
}
func (p *kafkaProducer) SendMessage(ctx context.Context, key, value []byte) error {
err := p.writer.WriteMessages(ctx, kafka.Message{
Key: key,
Value: value,
})
if err != nil {
return err
}
return nil
}
func (p *kafkaProducer) Close() error {
return p.writer.Close()
}
Kafka Consumer(消費者):
type KafkaConsumer struct {
reader *kafka.Reader
}
func NewKafkaConsumer(c *conf.Data) *KafkaConsumer {
brokers := c.Kafka.Brokers
topic := c.Kafka.Topic
groupId := c.Kafka.GroupId
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupId,
})
return &KafkaConsumer{
reader: reader,
}
}
func (c *KafkaConsumer) Start(ctx context.Context) {
for {
msg, err := c.reader.ReadMessage(ctx)
if err != nil {
return
}
log.Debugf("key=%s || value=%s", string(msg.Key), string(msg.Value))
}
}
func (c *KafkaConsumer) Close() error {
return c.reader.Close()
}
生產(chǎn) kafka 消息的方法:
創(chuàng)建文件 blog/internal/data/kafka.go。
示例代碼:
func (u *userRepository) KafkaSendMessage(ctx context.Context, key []byte, value []byte) (err error) {
defer u.data.kp.Close()
// 設(shè)置超時時間
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = u.data.kp.SendMessage(ctx, key, value)
if err != nil {
log.Errorf("KafkaSendMessage() || err=%v", err)
return
}
return
}
閱讀上面這段代碼,我們可以發(fā)現(xiàn) KafkaSendMessage 方法封裝了生產(chǎn) kafka 消息的方法 u.data.kp.SendMessage。
需要注意的是,我們需要設(shè)置超時時間,否則,會返回錯誤消息 context deadline exceeded。
添加 wire 提供者:
// ProviderSet is data providers.
var ProviderSet = wire.NewSet(NewData, NewGreeterRepo, NewDbEngine, NewUserRepository, NewKafkaProducer, NewKafkaConsumer)
生成 wire 代碼:
cd blog/cmd/blog
wire
3.操作 Kafka
在 Kratos 項目中,一般在項目的 biz 或 service 層使用 Kafka 的生產(chǎn)邏輯;在 service 層使用 Kafka 的消費邏輯。
限于篇幅,我們以 Kafka 的生產(chǎn)邏輯為例,介紹怎么在 biz 層生產(chǎn) Kafka 消息。
編寫文件 blog/internal/biz/user.go,在 CreateUser 方法中添加生產(chǎn) Kafka 消息的代碼。
type UserRepository interface {
Create(ctx context.Context, user *User) (int64, error)
KafkaSendMessage(ctx context.Context, key []byte, value []byte) (err error)
}
func (u *UserUsecase) CreateUser(ctx context.Context, user *User) (id int64, err error) {
id, err = u.userRepo.Create(ctx, user)
if err != nil {
return
}
if id > 0 {
var b []byte
b, err = json.Marshal(user)
if err != nil {
return
}
err = u.userRepo.KafkaSendMessage(ctx, []byte(user.Name), b)
if err != nil {
return
}
}
return
}
閱讀上面這段代碼,我們可以發(fā)現(xiàn) UserRepository 接口中的方法 KafkaSendMessage,就是我們在 blog/internal/data/kafka.go 文件中實現(xiàn)的方法。
項目運行和測試:
Kratos 運行:
kratos run
curl 請求示例:
curl -H "Content-Type: application/json" -X POST -d '{"name":"mac", "email":"mac@gmail.com", "password":"123456"}' http://192.168.110.209:8000/user/create
kafka 消費者:
kafka_2.13-3.9.0/bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
{"Id":10,"Name":"mac","Email":"mac@gmail.com","Password":"123456","Created":1735972949,"Updated":1735972949}
4.總結(jié)
本文我們通過示例代碼,介紹 Kratos 微服務(wù)框架怎么集成第三方庫 kafka-go,操作 Kafka。
參考資料
[1]kafka-go: https://github.com/segmentio/kafka-go