Go使用consul做服務(wù)發(fā)現(xiàn)
一、目標(biāo)
二、使用步驟
1. 安裝 consul
我們可以直接使用官方提供的二進(jìn)制文件來進(jìn)行安裝部署,其官網(wǎng)地址為 https://www.consul.io/downloads

下載后為可執(zhí)行文件,在我們開發(fā)試驗(yàn)過程中,可以直接使用 consul agent -dev 命令來啟動(dòng)一個(gè)單節(jié)點(diǎn)的 consul
在啟動(dòng)的打印日志中可以看到 agent: Started HTTP server on 127.0.0.1:8500 (tcp), 我們可以在瀏覽器直接訪問 127.0.0.1:8500 即可看到如下

這里我們的 consul 就啟動(dòng)成功了
2. 服務(wù)注冊(cè)
在網(wǎng)絡(luò)編程中,一般會(huì)提供項(xiàng)目的 IP、PORT、PROTOCOL,在服務(wù)治理中,我們還需要知道對(duì)應(yīng)的服務(wù)名、實(shí)例名以及一些自定義的擴(kuò)展信息
在這里使用 ServiceInstance 接口來規(guī)定注冊(cè)服務(wù)時(shí)必須的一些信息,同時(shí)用 DefaultServiceInstance 實(shí)現(xiàn)
- type ServiceInstance interface {
- // return The unique instance ID as registered.
- GetInstanceId() string
- // return The service ID as registered.
- GetServiceId() string
- // return The hostname of the registered service instance.
- GetHost() string
- // return The port of the registered service instance.
- GetPort() int // return Whether the port of the registered service instance uses HTTPS.
- IsSecure() bool // return The key / value pair metadata associated with the service instance.
- GetMetadata() map[string]string
- }type DefaultServiceInstance struct {
- InstanceId string
- ServiceId string
- Host string
- Port int Secure bool Metadata map[string]string
- }func NewDefaultServiceInstance(serviceId string, host string, port int, secure bool,
- metadata map[string]string, instanceId string) (*DefaultServiceInstance, error) {
- // 如果沒有傳入 IP 則獲取一下,這個(gè)方法在多網(wǎng)卡的情況下,并不好用 if len(host) == 0 {
- localIP, err := util.GetLocalIP() if err != nil {
- return nil, err
- } host = localIP } if len(instanceId) == 0 {
- instanceId = serviceId + "-" + strconv.FormatInt(time.Now().Unix(), 10) + "-" + strconv.Itoa(rand.Intn(9000)+1000)
- } return &DefaultServiceInstance{InstanceId: instanceId, ServiceId: serviceId, Host: host, Port: port, Secure: secure, Metadata: metadata}, nil
- }func (serviceInstance DefaultServiceInstance) GetInstanceId() string {
- return serviceInstance.InstanceId
- }func (serviceInstance DefaultServiceInstance) GetServiceId() string {
- return serviceInstance.ServiceId
- }func (serviceInstance DefaultServiceInstance) GetHost() string {
- return serviceInstance.Host
- }func (serviceInstance DefaultServiceInstance) GetPort() int { return serviceInstance.Port
- }func (serviceInstance DefaultServiceInstance) IsSecure() bool { return serviceInstance.Secure
- }func (serviceInstance DefaultServiceInstance) GetMetadata() map[string]string {
- return serviceInstance.Metadata
- }
定義接口
在上面規(guī)定了需要注冊(cè)的服務(wù)的必要信息,下面定義下服務(wù)注冊(cè)和剔除的方法
- type ServiceRegistry interface {
- Register(serviceInstance cloud.ServiceInstance) bool
- Deregister()
- }
具體實(shí)現(xiàn)
因?yàn)?consul 提供了 http 接口來對(duì) consul 進(jìn)行操作,我們也可以使用 http 請(qǐng)求方式進(jìn)行注冊(cè)和剔除操作,具體 http 接口文檔見 https://www.consul.io/api-docs, consul 默認(rèn)提供了go 語(yǔ)言的實(shí)現(xiàn),這里直接使用 github.com/hashicorp/consul/api
- import (
- "errors"
- "fmt"
- "github.com/hashicorp/consul/api"
- "strconv"
- "unsafe"
- )type consulServiceRegistry struct {
- serviceInstances map[string]map[string]cloud.ServiceInstance
- client api.Client localServiceInstance cloud.ServiceInstance}func (c consulServiceRegistry) Register(serviceInstance cloud.ServiceInstance) bool { // 創(chuàng)建注冊(cè)到consul的服務(wù)到 registration := new(api.AgentServiceRegistration) registration.ID = serviceInstance.GetInstanceId() registration.Name = serviceInstance.GetServiceId() registration.Port = serviceInstance.GetPort() var tags []string
- if serviceInstance.IsSecure() {
- tags = append(tags, "secure=true")
- } else {
- tags = append(tags, "secure=false")
- } if serviceInstance.GetMetadata() != nil {
- var tags []string
- for key, value := range serviceInstance.GetMetadata() {
- tags = append(tags, key+"="+value)
- } registration.Tags = tags } registration.Tags = tags registration.Address = serviceInstance.GetHost() // 增加consul健康檢查回調(diào)函數(shù) check := new(api.AgentServiceCheck) schema := "http"
- if serviceInstance.IsSecure() {
- schema = "https"
- } check.HTTP = fmt.Sprintf("%s://%s:%d/actuator/health", schema, registration.Address, registration.Port)
- check.Timeout = "5s"
- check.Interval = "5s"
- check.DeregisterCriticalServiceAfter = "20s" // 故障檢查失敗30s后 consul自動(dòng)將注冊(cè)服務(wù)刪除
- registration.Check = check // 注冊(cè)服務(wù)到consul err := c.client.Agent().ServiceRegister(registration) if err != nil {
- fmt.Println(err) return false
- } if c.serviceInstances == nil {
- c.serviceInstances = map[string]map[string]cloud.ServiceInstance{}
- } services := c.serviceInstances[serviceInstance.GetServiceId()] if services == nil {
- services = map[string]cloud.ServiceInstance{}
- } services[serviceInstance.GetInstanceId()] = serviceInstance c.serviceInstances[serviceInstance.GetServiceId()] = services c.localServiceInstance = serviceInstance return true
- }// deregister a servicefunc (c consulServiceRegistry) Deregister() { if c.serviceInstances == nil {
- return
- } services := c.serviceInstances[c.localServiceInstance.GetServiceId()] if services == nil {
- return
- } delete(services, c.localServiceInstance.GetInstanceId()) if len(services) == 0 {
- delete(c.serviceInstances, c.localServiceInstance.GetServiceId()) } _ = c.client.Agent().ServiceDeregister(c.localServiceInstance.GetInstanceId()) c.localServiceInstance = nil
- }// new a consulServiceRegistry instance// token is optionalfunc NewConsulServiceRegistry(host string, port int, token string) (*consulServiceRegistry, error) {
- if len(host) < 3 {
- return nil, errors.New("check host")
- } if port <= 0 || port > 65535 {
- return nil, errors.New("check port, port should between 1 and 65535")
- } config := api.DefaultConfig()
- config.Address = host + ":" + strconv.Itoa(port)
- config.Token = token
- client, err := api.NewClient(config)
- if err != nil {
- return nil, err
- } return &consulServiceRegistry{client: *client}, nil
- }
測(cè)試用例
注冊(cè)服務(wù)的代碼基本完成,來測(cè)試一下
- func TestConsulServiceRegistry(t *testing.T) {
- host := "127.0.0.1"
- port := 8500
- registryDiscoveryClient, _ := extension.NewConsulServiceRegistry(host, port, "")
- ip, err := util.GetLocalIP() if err != nil {
- t.Error(err) } serviceInstanceInfo, _ := cloud.NewDefaultServiceInstance("go-user-server", "", 8090,
- false, map[string]string{"user":"zyn"}, "")
- registryDiscoveryClient.Register(serviceInstanceInfo) r := gin.Default() // 健康檢測(cè)接口,其實(shí)只要是 200 就認(rèn)為成功了
- r.GET("/actuator/health", func(c *gin.Context) {
- c.JSON(200, gin.H{
- "message": "pong",
- })
- })
- err = r.Run(":8090")
- if err != nil{
- registryDiscoveryClient.Deregister()
- }
- }
如果成功,則會(huì)在 consul 看到 go-user-server 這個(gè)服務(wù)
3. 服務(wù)發(fā)現(xiàn)
在服務(wù)發(fā)現(xiàn)中,一般會(huì)需要兩個(gè)方法
- 獲取所有的服務(wù)列表
- 獲取指定的服務(wù)的所有實(shí)例信息
接口定義
- type DiscoveryClient interface {
- /**
- * Gets all ServiceInstances associated with a particular serviceId.
- * @param serviceId The serviceId to query.
- * @return A List of ServiceInstance.
- */
- GetInstances(serviceId string) ([]cloud.ServiceInstance, error) /**
- * @return All known service IDs.
- */
- GetServices() ([]string, error)}
具體實(shí)現(xiàn)
來實(shí)現(xiàn)一下
- type consulServiceRegistry struct {
- serviceInstances map[string]map[string]cloud.ServiceInstance
- client api.Client localServiceInstance cloud.ServiceInstance}func (c consulServiceRegistry) GetInstances(serviceId string) ([]cloud.ServiceInstance, error) {
- catalogService, _, _ := c.client.Catalog().Service(serviceId, "", nil)
- if len(catalogService) > 0 {
- result := make([]cloud.ServiceInstance, len(catalogService))
- for index, sever := range catalogService {
- s := cloud.DefaultServiceInstance{ InstanceId: sever.ServiceID, ServiceId: sever.ServiceName, Host: sever.Address, Port: sever.ServicePort, Metadata: sever.ServiceMeta, } result[index] = s } return result, nil
- } return nil, nil
- }func (c consulServiceRegistry) GetServices() ([]string, error) {
- services, _, _ := c.client.Catalog().Services(nil)
- result := make([]string, unsafe.Sizeof(services))
- index := 0
- for serviceName, _ := range services {
- result[index] = serviceName index++ } return result, nil
- }// new a consulServiceRegistry instance
- // token is optional
- func NewConsulServiceRegistry(host string, port int, token string) (*consulServiceRegistry, error) {
- if len(host) < 3 {
- return nil, errors.New("check host")
- }
- if port <= 0 || port > 65535 {
- return nil, errors.New("check port, port should between 1 and 65535")
- }
- config := api.DefaultConfig()
- config.Address = host + ":" + strconv.Itoa(port)
- config.Token = token
- client, err := api.NewClient(config)
- if err != nil {
- return nil, err
- }
- return &consulServiceRegistry{client: *client}, nil
- }
測(cè)試用例
- func TestConsulServiceDiscovery(t *testing.T) {
- host := "127.0.0.1"
- port := 8500
- token := ""
- registryDiscoveryClient, err := extension.NewConsulServiceRegistry(host, port, token)
- if err != nil { panic(err)
- } t.Log(registryDiscoveryClient.GetServices())
- t.Log(registryDiscoveryClient.GetInstances("go-user-server"))
- }
結(jié)果
- consul_service_registry_test.go:57: [consul go-user-server ] <nil>
- consul_service_registry_test.go:59: [{go-user-server-1602590661-56179 go-user-server 127.0.0.1 8090 false map[user:zyn]}] <nil>
總結(jié)
通過使用 consul api 我們可以簡(jiǎn)單的實(shí)現(xiàn)基于 consul 的服務(wù)發(fā)現(xiàn),在通過結(jié)合 http rpc 就可簡(jiǎn)單的實(shí)現(xiàn)服務(wù)的調(diào)用,下面一章來簡(jiǎn)單講下 go 如何發(fā)起 http 請(qǐng)求,為我們做 rpc 做個(gè)鋪墊