Skip to content

Commit

Permalink
Test different adapters; add ForceEncode to statsd label encoder
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd committed Nov 22, 2019
1 parent e50c004 commit 4cf2f3c
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 72 deletions.
2 changes: 1 addition & 1 deletion example/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func initMeter() *push.Controller {
if err != nil {
log.Panicf("failed to initialize metric stdout exporter %v", err)
}
batcher := defaultkeys.New(selector, metricsdk.DefaultLabelEncoder(), true)
batcher := defaultkeys.New(selector, metricsdk.NewDefaultLabelEncoder(), true)
pusher := push.New(batcher, exporter, time.Second)
pusher.Start()

Expand Down
14 changes: 6 additions & 8 deletions exporter/metric/dogstatsd/dogstatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type (
Exporter struct {
*statsd.Exporter
*statsd.LabelEncoder

ReencodedLabelsCount int
}
)

Expand Down Expand Up @@ -64,14 +66,10 @@ func (*Exporter) AppendName(rec export.Record, buf *bytes.Buffer) {

// AppendTags is part of the stats-internal adapter interface.
func (e *Exporter) AppendTags(rec export.Record, buf *bytes.Buffer) {
labels := rec.Labels()
encoded, inefficient := e.LabelEncoder.ForceEncode(rec.Labels())
_, _ = buf.WriteString(encoded)

if labels.Encoder() != e {
// TODO: This case could be handled by directly
// encoding the labels at this point, but presently it
// should not occur.
panic("Should have self-encoded labels")
if inefficient {
e.ReencodedLabelsCount++
}

_, _ = buf.WriteString(labels.Encoded())
}
69 changes: 69 additions & 0 deletions exporter/metric/dogstatsd/dogstatsd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dogstatsd_test

import (
"bytes"
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/exporter/metric/dogstatsd"
"go.opentelemetry.io/otel/exporter/metric/internal/statsd"
"go.opentelemetry.io/otel/exporter/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/counter"
)

// TestDogstatsLabels that labels are formatted in the correct style,
// whether or not the provided labels were encoded by a statsd label
// encoder.
func TestDogstatsLabels(t *testing.T) {
for inefficientCount, encoder := range []export.LabelEncoder{
statsd.NewLabelEncoder(), // inefficientCount == 0
sdk.NewDefaultLabelEncoder(), // inefficientCount == 1
} {
t.Run(fmt.Sprintf("%T", encoder), func(t *testing.T) {
ctx := context.Background()
checkpointSet := test.NewCheckpointSet(encoder)

desc := export.NewDescriptor("test.name", export.CounterKind, nil, "", "", core.Int64NumberKind, false)
cagg := counter.New()
_ = cagg.Update(ctx, core.NewInt64Number(123), desc)
cagg.Checkpoint(ctx, desc)

checkpointSet.Add(desc, cagg, key.New("A").String("B"))

var buf bytes.Buffer
exp, err := dogstatsd.New(dogstatsd.Config{
Writer: &buf,
})
require.Nil(t, err)
require.Equal(t, 0, exp.ReencodedLabelsCount)

err = exp.Export(ctx, checkpointSet)
require.Nil(t, err)

require.Equal(t, inefficientCount, exp.ReencodedLabelsCount)

require.Equal(t, "test.name:123|c|#A:B\n", buf.String())
})
}
}
156 changes: 99 additions & 57 deletions exporter/metric/internal/statsd/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,53 @@ import (
"go.opentelemetry.io/otel/exporter/metric/internal/statsd"
"go.opentelemetry.io/otel/exporter/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/counter"
"go.opentelemetry.io/otel/sdk/metric/aggregator/gauge"
)

type testAdapter struct {
// withTagsAdapter tests a dogstatsd-style statsd exporter.
type withTagsAdapter struct {
*statsd.LabelEncoder
}

func (*testAdapter) AppendName(rec export.Record, buf *bytes.Buffer) {
func (*withTagsAdapter) AppendName(rec export.Record, buf *bytes.Buffer) {
_, _ = buf.WriteString(rec.Descriptor().Name())
}

func (*testAdapter) AppendTags(rec export.Record, buf *bytes.Buffer) {
labels := rec.Labels()
_, _ = buf.WriteString(labels.Encoded())
func (ta *withTagsAdapter) AppendTags(rec export.Record, buf *bytes.Buffer) {
encoded, _ := ta.LabelEncoder.ForceEncode(rec.Labels())
_, _ = buf.WriteString(encoded)
}

func newAdapter() *testAdapter {
return &testAdapter{
func newWithTagsAdapter() *withTagsAdapter {
return &withTagsAdapter{
statsd.NewLabelEncoder(),
}
}

// noTagsAdapter simulates a plain-statsd exporter that appends tag
// values to the metric name.
type noTagsAdapter struct {
}

func (*noTagsAdapter) AppendName(rec export.Record, buf *bytes.Buffer) {
_, _ = buf.WriteString(rec.Descriptor().Name())

for _, tag := range rec.Labels().Ordered() {
_, _ = buf.WriteString(".")
_, _ = buf.WriteString(tag.Value.Emit())
}
}

func (*noTagsAdapter) AppendTags(rec export.Record, buf *bytes.Buffer) {
}

func newNoTagsAdapter() *noTagsAdapter {
return &noTagsAdapter{}
}

type testWriter struct {
vec []string
}
Expand Down Expand Up @@ -95,61 +118,80 @@ func measureAgg(desc *export.Descriptor, v float64) export.Aggregator {
}

func TestBasicFormat(t *testing.T) {
for _, nkind := range []core.NumberKind{
core.Float64NumberKind,
core.Int64NumberKind,
type adapterOutput struct {
adapter statsd.Adapter
expected string
}

for _, ao := range []adapterOutput{{
adapter: newWithTagsAdapter(),
expected: `counter:%s|c|#A:B,C:D
gauge:%s|g|#A:B,C:D
measure:%s|h|#A:B,C:D
timer:%s|ms|#A:B,C:D
`}, {
adapter: newNoTagsAdapter(),
expected: `counter.B.D:%s|c
gauge.B.D:%s|g
measure.B.D:%s|h
timer.B.D:%s|ms
`},
} {
t.Run(nkind.String(), func(t *testing.T) {
ctx := context.Background()
writer := &testWriter{}
config := statsd.Config{
Writer: writer,
MaxPacketSize: 1024,
}
adapter := newAdapter()
exp, err := statsd.NewExporter(config, adapter)
if err != nil {
t.Fatal("New error: ", err)
}
adapter := ao.adapter
expected := ao.expected
t.Run(fmt.Sprintf("%T", adapter), func(t *testing.T) {
for _, nkind := range []core.NumberKind{
core.Float64NumberKind,
core.Int64NumberKind,
} {
t.Run(nkind.String(), func(t *testing.T) {
ctx := context.Background()
writer := &testWriter{}
config := statsd.Config{
Writer: writer,
MaxPacketSize: 1024,
}
exp, err := statsd.NewExporter(config, adapter)
if err != nil {
t.Fatal("New error: ", err)
}

checkpointSet := test.NewCheckpointSet(adapter.LabelEncoder)
cdesc := export.NewDescriptor(
"counter", export.CounterKind, nil, "", "", nkind, false)
gdesc := export.NewDescriptor(
"gauge", export.GaugeKind, nil, "", "", nkind, false)
mdesc := export.NewDescriptor(
"measure", export.MeasureKind, nil, "", "", nkind, false)
tdesc := export.NewDescriptor(
"timer", export.MeasureKind, nil, "", unit.Milliseconds, nkind, false)

labels := []core.KeyValue{
key.New("A").String("B"),
key.New("C").String("D"),
}
const value = 123.456
checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
cdesc := export.NewDescriptor(
"counter", export.CounterKind, nil, "", "", nkind, false)
gdesc := export.NewDescriptor(
"gauge", export.GaugeKind, nil, "", "", nkind, false)
mdesc := export.NewDescriptor(
"measure", export.MeasureKind, nil, "", "", nkind, false)
tdesc := export.NewDescriptor(
"timer", export.MeasureKind, nil, "", unit.Milliseconds, nkind, false)

checkpointSet.Add(cdesc, counterAgg(cdesc, value), labels...)
checkpointSet.Add(gdesc, gaugeAgg(gdesc, value), labels...)
checkpointSet.Add(mdesc, measureAgg(mdesc, value), labels...)
checkpointSet.Add(tdesc, measureAgg(tdesc, value), labels...)
labels := []core.KeyValue{
key.New("A").String("B"),
key.New("C").String("D"),
}
const value = 123.456

err = exp.Export(ctx, checkpointSet)
require.Nil(t, err)
checkpointSet.Add(cdesc, counterAgg(cdesc, value), labels...)
checkpointSet.Add(gdesc, gaugeAgg(gdesc, value), labels...)
checkpointSet.Add(mdesc, measureAgg(mdesc, value), labels...)
checkpointSet.Add(tdesc, measureAgg(tdesc, value), labels...)

var vfmt string
if nkind == core.Int64NumberKind {
fv := float64(value)
vfmt = strconv.FormatInt(int64(fv), 10)
} else {
vfmt = strconv.FormatFloat(value, 'g', -1, 64)
}
err = exp.Export(ctx, checkpointSet)
require.Nil(t, err)

require.Equal(t, 1, len(writer.vec))
require.Equal(t, fmt.Sprintf(`counter:%s|c|#A:B,C:D
gauge:%s|g|#A:B,C:D
measure:%s|h|#A:B,C:D
timer:%s|ms|#A:B,C:D
`, vfmt, vfmt, vfmt, vfmt), writer.vec[0])
var vfmt string
if nkind == core.Int64NumberKind {
fv := float64(value)
vfmt = strconv.FormatInt(int64(fv), 10)
} else {
vfmt = strconv.FormatFloat(value, 'g', -1, 64)
}

require.Equal(t, 1, len(writer.vec))
require.Equal(t, fmt.Sprintf(expected, vfmt, vfmt, vfmt, vfmt), writer.vec[0])
})
}
})
}
}
Expand Down Expand Up @@ -270,7 +312,7 @@ func TestPacketSplit(t *testing.T) {
Writer: writer,
MaxPacketSize: 1024,
}
adapter := newAdapter()
adapter := newWithTagsAdapter()
exp, err := statsd.NewExporter(config, adapter)
if err != nil {
t.Fatal("New error: ", err)
Expand Down
19 changes: 19 additions & 0 deletions exporter/metric/internal/statsd/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ type LabelEncoder struct {
pool sync.Pool
}

// sameCheck is used to test whether label encoders are the same.
type sameCheck interface {
isStatsd()
}

var _ export.LabelEncoder = &LabelEncoder{}

// NewLabelEncoder returns a new encoder for dogstatsd-syntax metric
Expand Down Expand Up @@ -63,3 +68,17 @@ func (e *LabelEncoder) Encode(labels []core.KeyValue) string {
}
return buf.String()
}

func (e *LabelEncoder) isStatsd() {}

// ForceEncode returns a statsd label encoding, even if the exported
// labels were encoded by a different type of encoder. Returns a
// boolean to indicate whether the labels were in fact re-encoded, to
// test for (and warn about) efficiency.
func (e *LabelEncoder) ForceEncode(labels export.Labels) (string, bool) {
if _, ok := labels.Encoder().(sameCheck); ok {
return labels.Encoded(), false
}

return e.Encode(labels.Ordered()), true
}
44 changes: 38 additions & 6 deletions exporter/metric/internal/statsd/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,52 @@ import (
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/exporter/metric/internal/statsd"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
)

func TestLabelSytnax(t *testing.T) {
var testLabels = []core.KeyValue{
key.New("A").String("B"),
key.New("C").String("D"),
key.New("E").Float64(1.5),
}

func TestLabelSyntax(t *testing.T) {
encoder := statsd.NewLabelEncoder()

require.Equal(t, `|#A:B,C:D,E:1.5`, encoder.Encode([]core.KeyValue{
key.New("A").String("B"),
key.New("C").String("D"),
key.New("E").Float64(1.5),
}))
require.Equal(t, `|#A:B,C:D,E:1.5`, encoder.Encode(testLabels))

require.Equal(t, `|#A:B`, encoder.Encode([]core.KeyValue{
key.New("A").String("B"),
}))

require.Equal(t, "", encoder.Encode(nil))
}

func TestLabelForceEncode(t *testing.T) {
defaultLabelEncoder := sdk.NewDefaultLabelEncoder()
statsdLabelEncoder := statsd.NewLabelEncoder()

exportLabelsDefault := export.NewLabels(testLabels, defaultLabelEncoder.Encode(testLabels), defaultLabelEncoder)
exportLabelsStatsd := export.NewLabels(testLabels, statsdLabelEncoder.Encode(testLabels), statsdLabelEncoder)

statsdEncoding := exportLabelsStatsd.Encoded()
require.NotEqual(t, statsdEncoding, exportLabelsDefault.Encoded())

forced, repeat := statsdLabelEncoder.ForceEncode(exportLabelsDefault)
require.Equal(t, statsdEncoding, forced)
require.True(t, repeat)

forced, repeat = statsdLabelEncoder.ForceEncode(exportLabelsStatsd)
require.Equal(t, statsdEncoding, forced)
require.False(t, repeat)

// Check that this works for an embedded implementation.
exportLabelsEmbed := export.NewLabels(testLabels, statsdEncoding, struct {
*statsd.LabelEncoder
}{LabelEncoder: statsdLabelEncoder})

forced, repeat = statsdLabelEncoder.ForceEncode(exportLabelsEmbed)
require.Equal(t, statsdEncoding, forced)
require.False(t, repeat)
}

0 comments on commit 4cf2f3c

Please sign in to comment.