Skip to content

Commit

Permalink
Output stats to the Instrumental TCP Collector
Browse files Browse the repository at this point in the history
closes #1139
  • Loading branch information
jasonroelofs authored and sparrc committed May 18, 2016
1 parent be7ca56 commit f32916a
Show file tree
Hide file tree
Showing 6 changed files with 334 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features

- [#1138](https:/influxdata/telegraf/pull/1138): nstat input plugin. Thanks @Maksadbek!
- [#1139](https:/influxdata/telegraf/pull/1139): instrumental output plugin. Thanks @jasonroelofs!

### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ want to add support for another service or third-party API.
* [datadog](https:/influxdata/telegraf/tree/master/plugins/outputs/datadog)
* [file](https:/influxdata/telegraf/tree/master/plugins/outputs/file)
* [graphite](https:/influxdata/telegraf/tree/master/plugins/outputs/graphite)
* [instrumental](https:/influxdata/telegraf/tree/master/plugins/outputs/instrumental)
* [kafka](https:/influxdata/telegraf/tree/master/plugins/outputs/kafka)
* [librato](https:/influxdata/telegraf/tree/master/plugins/outputs/librato)
* [mqtt](https:/influxdata/telegraf/tree/master/plugins/outputs/mqtt)
Expand Down
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/file"
_ "github.com/influxdata/telegraf/plugins/outputs/graphite"
_ "github.com/influxdata/telegraf/plugins/outputs/influxdb"
_ "github.com/influxdata/telegraf/plugins/outputs/instrumental"
_ "github.com/influxdata/telegraf/plugins/outputs/kafka"
_ "github.com/influxdata/telegraf/plugins/outputs/kinesis"
_ "github.com/influxdata/telegraf/plugins/outputs/librato"
Expand Down
25 changes: 25 additions & 0 deletions plugins/outputs/instrumental/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Instrumental Output Plugin

This plugin writes to the [Instrumental Collector API](https://instrumentalapp.com/docs/tcp-collector)
and requires a Project-specific API token.

Instrumental accepts stats in a format very close to Graphite, with the only difference being that
the type of stat (gauge, increment) is the first token, separated from the metric itself
by whitespace. The `increment` type is only used if the metric comes in as a counter through `[[input.statsd]]`.

## Configuration:

```toml
[[outputs.instrumental]]
## Project API Token (required)
api_token = "API Token" # required
## Prefix the metrics with a given name
prefix = ""
## Stats output template (Graphite formatting)
## see https:/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
template = "host.tags.measurement.field"
## Timeout in seconds to connect
timeout = "2s"
## Debug true - Print communcation to Instrumental
debug = false
```
192 changes: 192 additions & 0 deletions plugins/outputs/instrumental/instrumental.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package instrumental

import (
"fmt"
"io"
"log"
"net"
"regexp"
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
)

type Instrumental struct {
Host string
ApiToken string
Prefix string
DataFormat string
Template string
Timeout internal.Duration
Debug bool

conn net.Conn
}

const (
DefaultHost = "collector.instrumentalapp.com"
AuthFormat = "hello version go/telegraf/1.0\nauthenticate %s\n"
)

var (
StatIncludesBadChar = regexp.MustCompile("[^[:alnum:][:blank:]-_.]")
)

var sampleConfig = `
## Project API Token (required)
api_token = "API Token" # required
## Prefix the metrics with a given name
prefix = ""
## Stats output template (Graphite formatting)
## see https:/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
template = "host.tags.measurement.field"
## Timeout in seconds to connect
timeout = "2s"
## Display Communcation to Instrumental
debug = false
`

func (i *Instrumental) Connect() error {
connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration)
if err != nil {
i.conn = nil
return err
}

err = i.authenticate(connection)
if err != nil {
i.conn = nil
return err
}

return nil
}

func (i *Instrumental) Close() error {
i.conn.Close()
i.conn = nil
return nil
}

func (i *Instrumental) Write(metrics []telegraf.Metric) error {
if i.conn == nil {
err := i.Connect()
if err != nil {
return fmt.Errorf("FAILED to (re)connect to Instrumental. Error: %s\n", err)
}
}

s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template)
if err != nil {
return err
}

var points []string
var metricType string
var toSerialize telegraf.Metric
var newTags map[string]string

for _, metric := range metrics {
// Pull the metric_type out of the metric's tags. We don't want the type
// to show up with the other tags pulled from the system, as they go in the
// beginning of the line instead.
// e.g we want:
//
// increment some_prefix.host.tag1.tag2.tag3.field value timestamp
//
// vs
//
// increment some_prefix.host.tag1.tag2.tag3.counter.field value timestamp
//
newTags = metric.Tags()
metricType = newTags["metric_type"]
delete(newTags, "metric_type")

toSerialize, _ = telegraf.NewMetric(
metric.Name(),
newTags,
metric.Fields(),
metric.Time(),
)

stats, err := s.Serialize(toSerialize)
if err != nil {
log.Printf("Error serializing a metric to Instrumental: %s", err)
}

switch metricType {
case "counter":
fallthrough
case "histogram":
metricType = "increment"
default:
metricType = "gauge"
}

for _, stat := range stats {
if !StatIncludesBadChar.MatchString(stat) {
points = append(points, fmt.Sprintf("%s %s", metricType, stat))
} else if i.Debug {
log.Printf("Unable to send bad stat: %s", stat)
}
}
}

allPoints := strings.Join(points, "\n") + "\n"
_, err = fmt.Fprintf(i.conn, allPoints)

if i.Debug {
log.Println(allPoints)
}

if err != nil {
if err == io.EOF {
i.Close()
}

return err
}

return nil
}

func (i *Instrumental) Description() string {
return "Configuration for sending metrics to an Instrumental project"
}

func (i *Instrumental) SampleConfig() string {
return sampleConfig
}

func (i *Instrumental) authenticate(conn net.Conn) error {
_, err := fmt.Fprintf(conn, AuthFormat, i.ApiToken)
if err != nil {
return err
}

// The response here will either be two "ok"s or an error message.
responses := make([]byte, 512)
if _, err = conn.Read(responses); err != nil {
return err
}

if string(responses)[:6] != "ok\nok\n" {
return fmt.Errorf("Authentication failed: %s", responses)
}

i.conn = conn
return nil
}

func init() {
outputs.Add("instrumental", func() telegraf.Output {
return &Instrumental{
Host: DefaultHost,
Template: graphite.DEFAULT_TEMPLATE,
}
})
}
114 changes: 114 additions & 0 deletions plugins/outputs/instrumental/instrumental_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package instrumental

import (
"bufio"
"net"
"net/textproto"
"sync"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert"
)

func TestWrite(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go TCPServer(t, &wg)
// Give the fake TCP server some time to start:
time.Sleep(time.Millisecond * 100)

i := Instrumental{
Host: "127.0.0.1",
ApiToken: "abc123token",
Prefix: "my.prefix",
}
i.Connect()

// Default to gauge
m1, _ := telegraf.NewMetric(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := telegraf.NewMetric(
"mymeasurement",
map[string]string{"host": "192.168.0.1", "metric_type": "set"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)

// Simulate a connection close and reconnect.
metrics := []telegraf.Metric{m1, m2}
i.Write(metrics)
i.Close()

// Counter and Histogram are increments
m3, _ := telegraf.NewMetric(
"my_histogram",
map[string]string{"host": "192.168.0.1", "metric_type": "histogram"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
// We will drop metrics that simply won't be accepted by Instrumental
m4, _ := telegraf.NewMetric(
"bad_values",
map[string]string{"host": "192.168.0.1", "metric_type": "counter"},
map[string]interface{}{"value": "\" 3:30\""},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m5, _ := telegraf.NewMetric(
"my_counter",
map[string]string{"host": "192.168.0.1", "metric_type": "counter"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)

metrics = []telegraf.Metric{m3, m4, m5}
i.Write(metrics)

wg.Wait()
i.Close()
}

func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:8000")
defer wg.Done()
conn, _ := tcpServer.Accept()
conn.SetDeadline(time.Now().Add(1 * time.Second))
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)

hello, _ := tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello)
auth, _ := tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth)

conn.Write([]byte("ok\nok\n"))

data1, _ := tp.ReadLine()
assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
data2, _ := tp.ReadLine()
assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)

conn, _ = tcpServer.Accept()
conn.SetDeadline(time.Now().Add(1 * time.Second))
reader = bufio.NewReader(conn)
tp = textproto.NewReader(reader)

hello, _ = tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello)
auth, _ = tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth)

conn.Write([]byte("ok\nok\n"))

data3, _ := tp.ReadLine()
assert.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3)
data4, _ := tp.ReadLine()
assert.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data4)

conn.Close()
}

0 comments on commit f32916a

Please sign in to comment.