From 8954aff06d2388c288a0b5c5f0d4c986f003b25e Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Fri, 5 Oct 2018 01:05:10 +0100 Subject: [PATCH] Extract CompressWithGzip into the internal package --- internal/internal.go | 20 ++++++++++++++++++++ internal/internal_test.go | 20 ++++++++++++++++++++ plugins/outputs/http/http.go | 17 +---------------- plugins/outputs/influxdb/http.go | 18 ++---------------- plugins/outputs/influxdb_v2/http.go | 18 ++---------------- 5 files changed, 45 insertions(+), 48 deletions(-) diff --git a/internal/internal.go b/internal/internal.go index f7d75dfb3a301..de9a496116040 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -3,8 +3,10 @@ package internal import ( "bufio" "bytes" + "compress/gzip" "crypto/rand" "errors" + "io" "log" "math/big" "os" @@ -208,3 +210,21 @@ func ExitStatus(err error) (int, bool) { } return 0, false } + +// CompressWithGzip takes an io.Reader as input and pipes +// it through a gzip.Writer returning an io.Reader containing +// the gzipped data. +// An error is returned if passing data to the gzip.Writer fails +func CompressWithGzip(data io.Reader) (io.Reader, error) { + pipeReader, pipeWriter := io.Pipe() + gzipWriter := gzip.NewWriter(pipeWriter) + var err error + + go func() { + _, err = io.Copy(gzipWriter, data) + gzipWriter.Close() + pipeWriter.Close() + }() + + return pipeReader, err +} diff --git a/internal/internal_test.go b/internal/internal_test.go index ee1d24418ad18..3b4ec5dda0713 100644 --- a/internal/internal_test.go +++ b/internal/internal_test.go @@ -1,6 +1,9 @@ package internal import ( + "bytes" + "compress/gzip" + "io/ioutil" "os/exec" "testing" "time" @@ -162,3 +165,20 @@ func TestDuration(t *testing.T) { d.UnmarshalTOML([]byte(`1.5`)) assert.Equal(t, time.Second, d.Duration) } + +func TestCompressWithGzip(t *testing.T) { + testData := "the quick brown fox jumps over the lazy dog" + inputBuffer := bytes.NewBuffer([]byte(testData)) + + outputBuffer, err := CompressWithGzip(inputBuffer) + assert.NoError(t, err) + + gzipReader, err := gzip.NewReader(outputBuffer) + assert.NoError(t, err) + defer gzipReader.Close() + + output, err := ioutil.ReadAll(gzipReader) + assert.NoError(t, err) + + assert.Equal(t, testData, string(output)) +} diff --git a/plugins/outputs/http/http.go b/plugins/outputs/http/http.go index 274e2644d7546..ae84d8c3a9db7 100644 --- a/plugins/outputs/http/http.go +++ b/plugins/outputs/http/http.go @@ -2,7 +2,6 @@ package http import ( "bytes" - "compress/gzip" "context" "fmt" "io" @@ -173,7 +172,7 @@ func (h *HTTP) write(reqBody []byte) error { var err error if h.ContentEncoding == "gzip" && h.Method == http.MethodPost { - reqBodyBuffer, err = compressWithGzip(reqBodyBuffer) + reqBodyBuffer, err = internal.CompressWithGzip(reqBodyBuffer) if err != nil { return err } @@ -210,20 +209,6 @@ func (h *HTTP) write(reqBody []byte) error { return nil } -func compressWithGzip(data io.Reader) (io.Reader, error) { - pr, pw := io.Pipe() - gw := gzip.NewWriter(pw) - var err error - - go func() { - _, err = io.Copy(gw, data) - gw.Close() - pw.Close() - }() - - return pr, err -} - func init() { outputs.Add("http", func() telegraf.Output { return &HTTP{ diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index 164261feb623f..f32ad79a44ac6 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -1,7 +1,6 @@ package influxdb import ( - "compress/gzip" "context" "crypto/tls" "encoding/json" @@ -16,6 +15,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/serializers/influx" ) @@ -360,7 +360,7 @@ func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) { func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { var err error if c.ContentEncoding == "gzip" { - body, err = compressWithGzip(body) + body, err = internal.CompressWithGzip(body) if err != nil { return nil, err } @@ -381,20 +381,6 @@ func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { return req, nil } -func compressWithGzip(data io.Reader) (io.Reader, error) { - pr, pw := io.Pipe() - gw := gzip.NewWriter(pw) - var err error - - go func() { - _, err = io.Copy(gw, data) - gw.Close() - pw.Close() - }() - - return pr, err -} - func (c *httpClient) addHeaders(req *http.Request) { if c.Username != "" || c.Password != "" { req.SetBasicAuth(c.Username, c.Password) diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index 1e7061a270a39..12826ff92b387 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -1,7 +1,6 @@ package influxdb_v2 import ( - "compress/gzip" "context" "crypto/tls" "encoding/json" @@ -17,6 +16,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/serializers/influx" ) @@ -231,7 +231,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { var err error if c.ContentEncoding == "gzip" { - body, err = compressWithGzip(body) + body, err = internal.CompressWithGzip(body) if err != nil { return nil, err } @@ -258,20 +258,6 @@ func (c *httpClient) addHeaders(req *http.Request) { } } -func compressWithGzip(data io.Reader) (io.Reader, error) { - pipeReader, pipeWriter := io.Pipe() - gzipWriter := gzip.NewWriter(pipeWriter) - var err error - - go func() { - _, err = io.Copy(gzipWriter, data) - gzipWriter.Close() - pipeWriter.Close() - }() - - return pipeReader, err -} - func makeWriteURL(loc url.URL, org, bucket string) (string, error) { params := url.Values{} params.Set("bucket", bucket)