Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batched exports to the Prometheus Remote Write Exporter #34

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int
}
}

if err := prwe.export(ctx, tsMap); err != nil {
if exportErrors := prwe.export(ctx, tsMap); exportErrors != nil {
JasonXZLiu marked this conversation as resolved.
Show resolved Hide resolved
dropped = md.MetricCount()
errs = append(errs, consumererror.Permanent(err))
errs = append(errs, exportErrors...)
}

if dropped != 0 {
Expand Down Expand Up @@ -252,13 +252,37 @@ func (prwe *PrwExporter) handleSummaryMetric(tsMap map[string]*prompb.TimeSeries
}

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {
// Calls the helper function to convert the TsMap to the desired format
req, err := wrapTimeSeries(tsMap)
func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error {
var errs []error
// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := wrapAndBatchTimeSeries(tsMap)
if err != nil {
return consumererror.Permanent(err)
errs = append(errs, err)
JasonXZLiu marked this conversation as resolved.
Show resolved Hide resolved
return errs
}

var wg sync.WaitGroup
var mtx sync.Mutex
JasonXZLiu marked this conversation as resolved.
Show resolved Hide resolved

// performs the batched requests concurrently
JasonXZLiu marked this conversation as resolved.
Show resolved Hide resolved
for _, request := range requests {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I'm worried about this is it can create a large number of concurrent outgoing request. We should cap the max number of concurrent requests.

Copy link
Author

@JasonXZLiu JasonXZLiu Dec 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a cap on the number of concurrent requests (with some logs) and addressed the comments you suggested above. Here's an example of the logs produced with this commit where the cap was 3 concurrent requests.

total metrics to process: 194128
tsArray length: 44566
sizeOfCurrentBatch: 2999930
tsArray length: 44560
sizeOfCurrentBatch: 2999986
tsArray length: 44555
sizeOfCurrentBatch: 2999962
tsArray length: 44553
sizeOfCurrentBatch: 2999978
tsArray length: 15894
sizeOfCurrentBatch: 1069220
next batch of requests
making request 0
making request 1
making request 2
next batch of requests
making request 4
making request 3

wg.Add(1)
go func(req *prompb.WriteRequest) {
requestError := prwe.execute(ctx, req)
if requestError != nil {
mtx.Lock()
errs = append(errs, consumererror.Permanent(requestError))
JasonXZLiu marked this conversation as resolved.
Show resolved Hide resolved
mtx.Unlock()
}
wg.Done()
}(request)
}
wg.Wait()

return errs
}

func (prwe *PrwExporter) execute(ctx context.Context, req *prompb.WriteRequest) error {
// Uses proto.Marshal to convert the WriteRequest into bytes array
data, err := proto.Marshal(req)
if err != nil {
Expand Down
17 changes: 10 additions & 7 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,17 +227,19 @@ func Test_export(t *testing.T) {
if !tt.serverUp {
server.Close()
}
err := runExportPipeline(ts1, serverURL)
errs := runExportPipeline(ts1, serverURL)
if tt.returnError {
assert.Error(t, err)
assert.Error(t, errs[0])
return
}
assert.NoError(t, err)
assert.Len(t, errs, 0)
})
}
}

func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error {
func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) []error {
var errs []error

// First we will construct a TimeSeries array from the testutils package
testmap := make(map[string]*prompb.TimeSeries)
testmap["test"] = ts
Expand All @@ -246,10 +248,11 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error {
// after this, instantiate a CortexExporter with the current HTTP client and endpoint set to passed in endpoint
prwe, err := NewPrwExporter("test", endpoint.String(), HTTPClient, map[string]string{})
if err != nil {
return err
errs = append(errs, err)
return errs
}
err = prwe.export(context.Background(), testmap)
return err
errs = append(errs, prwe.export(context.Background(), testmap)...)
return errs
}

// Test_PushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as
Expand Down
45 changes: 38 additions & 7 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"
"unicode"

"github.com/gogo/protobuf/proto"
"github.com/prometheus/prometheus/prompb"

"go.opentelemetry.io/collector/consumer/pdata"
Expand All @@ -41,6 +42,10 @@ const (
totalStr = "total"
delimeter = "_"
keyStr = "key"

checkByteSizeInterval = 5000
maxBatchByteSize = 3000000
maxBatchMetric = 40000
)

// ByLabelName enables the usage of sort.Sort() with a slice of labels
Expand Down Expand Up @@ -215,21 +220,47 @@ func getPromMetricName(metric *otlp.Metric, ns string) string {
return sanitize(b.String())
}

// Simple helper function that takes the <Signature String - *TimeSeries> map
// and creates a WriteRequest from the struct -- can move to the helper.go file
func wrapTimeSeries(tsMap map[string]*prompb.TimeSeries) (*prompb.WriteRequest, error) {
// Helper function that takes the <Signature String - *TimeSeries> map
JasonXZLiu marked this conversation as resolved.
Show resolved Hide resolved
// and creates a batch of WriteRequest from the struct
func wrapAndBatchTimeSeries(tsMap map[string]*prompb.TimeSeries) ([]*prompb.WriteRequest, error) {
JasonXZLiu marked this conversation as resolved.
Show resolved Hide resolved
if len(tsMap) == 0 {
return nil, errors.New("invalid tsMap: cannot be empty map")
}

var requests []*prompb.WriteRequest
var tsArray []prompb.TimeSeries
missingLastBatch := true
JasonXZLiu marked this conversation as resolved.
Show resolved Hide resolved

for _, v := range tsMap {
tsArray = append(tsArray, *v)
missingLastBatch = true

if len(tsArray)%checkByteSizeInterval == 0 {
wrapped := prompb.WriteRequest{
Timeseries: tsArray,
// Other parameters of the WriteRequest are unnecessary for our Export
}

if b := proto.Size(&wrapped); b >= maxBatchByteSize || len(tsArray) >= maxBatchMetric {
JasonXZLiu marked this conversation as resolved.
Show resolved Hide resolved
requests = append(requests, &wrapped)
tsArray = make([]prompb.TimeSeries, 0)

missingLastBatch = false
}
}
}
wrapped := prompb.WriteRequest{
Timeseries: tsArray,
// Other parameters of the WriteRequest are unnecessary for our Export

if missingLastBatch {
wrapped := prompb.WriteRequest{
Timeseries: tsArray,
// Other parameters of the WriteRequest are unnecessary for our Export
JasonXZLiu marked this conversation as resolved.
Show resolved Hide resolved
}

requests = append(requests, &wrapped)
tsArray = make([]prompb.TimeSeries, 0)
}
return &wrapped, nil

return requests, nil
}

// convertTimeStamp converts OTLP timestamp in ns to timestamp in ms
Expand Down