新聞中心
prometheus一般都是采用pull方式獲取數(shù)據(jù),但是有一些情況下,不方便配置exporter,就希望能通過push的方式上傳指標數(shù)據(jù)。

10年積累的成都網(wǎng)站設(shè)計、網(wǎng)站建設(shè)經(jīng)驗,可以快速應(yīng)對客戶對網(wǎng)站的新想法和需求。提供各種問題對應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認識你,你也不認識我。但先網(wǎng)站制作后付款的網(wǎng)站建設(shè)流程,更有陽明免費網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。
簡介
prometheus一般都是采用pull方式獲取數(shù)據(jù),但是有一些情況下,不方便配置exporter,就希望能通過push的方式上傳指標數(shù)據(jù)。
1、可以采用pushgateway的方式,推送到pushgateway,然后prometheus通過pushgateway拉取數(shù)據(jù)。
2、在新版本中增加了一個參數(shù):–enable-feature=remote-write-receiver,允許遠程通過接口/api/v1/write,直接寫數(shù)據(jù)到prometheus里面。
pushgateway在高并發(fā)的情況下還是比較消耗資源的,特別是開啟一致性檢查,高并發(fā)寫入的時候特別慢。
第二種方式少了一層轉(zhuǎn)發(fā),速度應(yīng)該比較快。
接口
可以通過prometheus的http接口/api/v1/write提交數(shù)據(jù),這個接口的數(shù)據(jù)格式有有要求: 使用POST方式提交 需要經(jīng)過protobuf編碼,依賴github.com/gogo/protobuf/proto 可以使用snappy進行壓縮,依賴github.com/golang/snappy
步驟:
收集指標名稱,時間戳,值和標簽 將數(shù)據(jù)轉(zhuǎn)換成prometheus需要的數(shù)據(jù)格式 使用proto對數(shù)據(jù)進行編碼,并用snappy進行壓縮 通過httpClient提交數(shù)據(jù)
package prome
import (
"bufio"
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
"net/url"
"regexp"
"time"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/opentracing-contrib/go-stdlib/nethttp"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/prompb"
)
type RecoverableError struct {
error
}
type HttpClient struct {
url *url.URL
Client *http.Client
timeout time.Duration
}
var MetricNameRE = regexp.MustCompile(`^[a-zA-Z_:][a-zA-Z0-9_:]*$`)
type MetricPoint struct {
Metric string `json:"metric"` // 指標名稱
TagsMap map[string]string `json:"tags"` // 數(shù)據(jù)標簽
Time int64 `json:"time"` // 時間戳,單位是秒
Value float64 `json:"value"` // 內(nèi)部字段,最終轉(zhuǎn)換之后的float64數(shù)值
}
func (c *HttpClient) remoteWritePost(req []byte) error {
httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(req))
if err != nil {
return err
}
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("User-Agent", "opcai")
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
httpReq = httpReq.WithContext(ctx)
if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
var ht *nethttp.Tracer
httpReq, ht = nethttp.TraceRequest(
parentSpan.Tracer(),
httpReq,
nethttp.OperationName("Remote Store"),
nethttp.ClientTrace(false),
)
defer ht.Finish()
}
httpResp, err := c.Client.Do(httpReq)
if err != nil {
// Errors from Client.Do are from (for example) network errors, so are
// recoverable.
return RecoverableError{err}
}
defer func() {
io.Copy(ioutil.Discard, httpResp.Body)
httpResp.Body.Close()
}()
if httpResp.StatusCode/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 512))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
}
if httpResp.StatusCode/100 == 5 {
return RecoverableError{err}
}
return err
}
func buildWriteRequest(samples []*prompb.TimeSeries) ([]byte, error) {
req := &prompb.WriteRequest{
Timeseries: samples,
}
data, err := proto.Marshal(req)
if err != nil {
return nil, err
}
compressed := snappy.Encode(nil, data)
return compressed, nil
}
type sample struct {
labels labels.Labels
t int64
v float64
}
const (
LABEL_NAME = "__name__"
)
func convertOne(item *MetricPoint) (*prompb.TimeSeries, error) {
pt := prompb.TimeSeries{}
pt.Samples = []prompb.Sample{{}}
s := sample{}
s.t = item.Time
s.v = item.Value
// name
if !MetricNameRE.MatchString(item.Metric) {
return &pt, errors.New("invalid metrics name")
}
nameLs := labels.Label{
Name: LABEL_NAME,
Value: item.Metric,
}
s.labels = append(s.labels, nameLs)
for k, v := range item.TagsMap {
if model.LabelNameRE.MatchString(k) {
ls := labels.Label{
Name: k,
Value: v,
}
s.labels = append(s.labels, ls)
}
}
pt.Labels = labelsToLabelsProto(s.labels, pt.Labels)
// 時間賦值問題,使用毫秒時間戳
tsMs := time.Unix(s.t, 0).UnixNano() / 1e6
pt.Samples[0].Timestamp = tsMs
pt.Samples[0].Value = s.v
return &pt, nil
}
func labelsToLabelsProto(labels labels.Labels, buf []*prompb.Label) []*prompb.Label {
result := buf[:0]
if cap(buf) for _, l := range labels {
result = append(result, &prompb.Label{
Name: l.Name,
Value: l.Value,
})
}
return result
}
func (c *HttpClient) RemoteWrite(items []MetricPoint) (err error) {
if len(items) == 0 {
return
}
ts := make([]*prompb.TimeSeries, len(items))
for i := range items {
ts[i], err = convertOne(&items[i])
if err != nil {
return
}
}
data, err := buildWriteRequest(ts)
if err != nil {
return
}
err = c.remoteWritePost(data)
return
}
func NewClient(ur string, timeout time.Duration) (c *HttpClient, err error) {
u, err := url.Parse(ur)
if err != nil {
return
}
c = &HttpClient{
url: u,
Client: &http.Client{},
timeout: timeout,
}
return
}
測試
prometheus啟動的時候記得加參數(shù)–enable-feature=remote-write-receiver
package prome
import (
"testing"
"time"
)
func TestRemoteWrite(t *testing.T) {
c, err := NewClient("http://localhost:9090/api/v1/write", 10*time.Second)
if err != nil {
t.Fatal(err)
}
metrics := []MetricPoint{
{Metric: "opcai1",
TagsMap: map[string]string{"env": "testing", "op": "opcai"},
Time: time.Now().Add(-1 * time.Minute).Unix(),
Value: 1},
{Metric: "opcai2",
TagsMap: map[string]string{"env": "testing", "op": "opcai"},
Time: time.Now().Add(-2 * time.Minute).Unix(),
Value: 2},
{Metric: "opcai3",
TagsMap: map[string]string{"env": "testing", "op": "opcai"},
Time: time.Now().Unix(),
Value: 3},
{Metric: "opcai4",
TagsMap: map[string]string{"env": "testing", "op": "opcai"},
Time: time.Now().Unix(),
Value: 4},
}
err = c.RemoteWrite(metrics)
if err != nil {
t.Fatal(err)
}
t.Log("end...")
}
使用go test進行測試
go test -v
新聞名稱:遠程寫入prometheus存儲具體方法
文章鏈接:http://fisionsoft.com.cn/article/dpsgjdd.html


咨詢
建站咨詢
