遠程寫入prometheus存儲
簡介
prometheus一般都是采用pull方式獲取數據,但是有一些情況下,不方便配置exporter,就希望能通過push的方式上傳指標數據。
1、可以采用pushgateway的方式,推送到pushgateway,然后prometheus通過pushgateway拉取數據。
2、在新版本中增加了一個參數:--enable-feature=remote-write-receiver,允許遠程通過接口/api/v1/write,直接寫數據到prometheus里面。
pushgateway在高并發(fā)的情況下還是比較消耗資源的,特別是開啟一致性檢查,高并發(fā)寫入的時候特別慢。
第二種方式少了一層轉發(fā),速度應該比較快。

接口
可以通過prometheus的http接口/api/v1/write提交數據,這個接口的數據格式有有要求:
- 使用POST方式提交
- 需要經過protobuf編碼,依賴github.com/gogo/protobuf/proto
- 可以使用snappy進行壓縮,依賴github.com/golang/snappy
步驟:
- 收集指標名稱,時間戳,值和標簽
- 將數據轉換成prometheus需要的數據格式
- 使用proto對數據進行編碼,并用snappy進行壓縮
- 通過httpClient提交數據
- package prome
- import (
- "bufio"
- "bytes"
- "context"
- "io"
- "io/ioutil"
- "net/http"
- "net/url"
- "regexp"
- "time"
- "github.com/gogo/protobuf/proto"
- "github.com/golang/snappy"
- "github.com/opentracing-contrib/go-stdlib/nethttp"
- opentracing "github.com/opentracing/opentracing-go"
- "github.com/pkg/errors"
- "github.com/prometheus/common/model"
- "github.com/prometheus/prometheus/pkg/labels"
- "github.com/prometheus/prometheus/prompb"
- )
- type RecoverableError struct {
- error
- }
- type HttpClient struct {
- url *url.URL
- Client *http.Client
- timeout time.Duration
- }
- var MetricNameRE = regexp.MustCompile(`^[a-zA-Z_:][a-zA-Z0-9_:]*$`)
- type MetricPoint struct {
- Metric string `json:"metric"` // 指標名稱
- TagsMap map[string]string `json:"tags"` // 數據標簽
- Time int64 `json:"time"` // 時間戳,單位是秒
- Value float64 `json:"value"` // 內部字段,最終轉換之后的float64數值
- }
- func (c *HttpClient) remoteWritePost(req []byte) error {
- httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(req))
- if err != nil {
- return err
- }
- httpReq.Header.Add("Content-Encoding", "snappy")
- httpReq.Header.Set("Content-Type", "application/x-protobuf")
- httpReq.Header.Set("User-Agent", "opcai")
- httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
- ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
- defer cancel()
- httpReq = httpReq.WithContext(ctx)
- if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
- var ht *nethttp.Tracer
- httpReq, ht = nethttp.TraceRequest(
- parentSpan.Tracer(),
- httpReq,
- nethttp.OperationName("Remote Store"),
- nethttp.ClientTrace(false),
- )
- defer ht.Finish()
- }
- httpResp, err := c.Client.Do(httpReq)
- if err != nil {
- // Errors from Client.Do are from (for example) network errors, so are
- // recoverable.
- return RecoverableError{err}
- }
- defer func() {
- io.Copy(ioutil.Discard, httpResp.Body)
- httpResp.Body.Close()
- }()
- if httpResp.StatusCode/100 != 2 {
- scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 512))
- line := ""
- if scanner.Scan() {
- line = scanner.Text()
- }
- err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
- }
- if httpResp.StatusCode/100 == 5 {
- return RecoverableError{err}
- }
- return err
- }
- func buildWriteRequest(samples []*prompb.TimeSeries) ([]byte, error) {
- req := &prompb.WriteRequest{
- Timeseries: samples,
- }
- data, err := proto.Marshal(req)
- if err != nil {
- return nil, err
- }
- compressed := snappy.Encode(nil, data)
- return compressed, nil
- }
- type sample struct {
- labels labels.Labels
- t int64
- v float64
- }
- const (
- LABEL_NAME = "__name__"
- )
- func convertOne(item *MetricPoint) (*prompb.TimeSeries, error) {
- pt := prompb.TimeSeries{}
- pt.Samples = []prompb.Sample{{}}
- s := sample{}
- s.t = item.Time
- s.v = item.Value
- // name
- if !MetricNameRE.MatchString(item.Metric) {
- return &pt, errors.New("invalid metrics name")
- }
- nameLs := labels.Label{
- Name: LABEL_NAME,
- Value: item.Metric,
- }
- s.labels = append(s.labels, nameLs)
- for k, v := range item.TagsMap {
- if model.LabelNameRE.MatchString(k) {
- ls := labels.Label{
- Name: k,
- Value: v,
- }
- s.labels = append(s.labels, ls)
- }
- }
- pt.Labels = labelsToLabelsProto(s.labels, pt.Labels)
- // 時間賦值問題,使用毫秒時間戳
- tsMs := time.Unix(s.t, 0).UnixNano() / 1e6
- pt.Samples[0].Timestamp = tsMs
- pt.Samples[0].Value = s.v
- return &pt, nil
- }
- func labelsToLabelsProto(labels labels.Labels, buf []*prompb.Label) []*prompb.Label {
- result := buf[:0]
- if cap(buf) < len(labels) {
- result = make([]*prompb.Label, 0, len(labels))
- }
- for _, l := range labels {
- result = append(result, &prompb.Label{
- Name: l.Name,
- Value: l.Value,
- })
- }
- return result
- }
- func (c *HttpClient) RemoteWrite(items []MetricPoint) (err error) {
- if len(items) == 0 {
- return
- }
- ts := make([]*prompb.TimeSeries, len(items))
- for i := range items {
- ts[i], err = convertOne(&items[i])
- if err != nil {
- return
- }
- }
- data, err := buildWriteRequest(ts)
- if err != nil {
- return
- }
- err = c.remoteWritePost(data)
- return
- }
- func NewClient(ur string, timeout time.Duration) (c *HttpClient, err error) {
- u, err := url.Parse(ur)
- if err != nil {
- return
- }
- c = &HttpClient{
- url: u,
- Client: &http.Client{},
- timeout: timeout,
- }
- return
- }
測試
prometheus啟動的時候記得加參數--enable-feature=remote-write-receiver
- package prome
- import (
- "testing"
- "time"
- )
- func TestRemoteWrite(t *testing.T) {
- c, err := NewClient("http://localhost:9090/api/v1/write", 10*time.Second)
- if err != nil {
- t.Fatal(err)
- }
- metrics := []MetricPoint{
- {Metric: "opcai1",
- TagsMap: map[string]string{"env": "testing", "op": "opcai"},
- Time: time.Now().Add(-1 * time.Minute).Unix(),
- Value: 1},
- {Metric: "opcai2",
- TagsMap: map[string]string{"env": "testing", "op": "opcai"},
- Time: time.Now().Add(-2 * time.Minute).Unix(),
- Value: 2},
- {Metric: "opcai3",
- TagsMap: map[string]string{"env": "testing", "op": "opcai"},
- Time: time.Now().Unix(),
- Value: 3},
- {Metric: "opcai4",
- TagsMap: map[string]string{"env": "testing", "op": "opcai"},
- Time: time.Now().Unix(),
- Value: 4},
- }
- err = c.RemoteWrite(metrics)
- if err != nil {
- t.Fatal(err)
- }
- t.Log("end...")
- }
使用go test進行測試
- go test -v
總結
這個方法也是在看夜鶯v5的代碼的時候發(fā)現(xiàn)的,剛好有需要統(tǒng)一收集redis的監(jiān)控指標,剛好可以用上,之前用pushgateway寫的實在是慢。