Skip to content

Commit

Permalink
Create Filter plugin support
Browse files Browse the repository at this point in the history
Move Histogram as filter plugin
  • Loading branch information
Ali Alrahahleh committed Jun 14, 2016
2 parents 3a9c6ba + 1b05b45 commit d2390e9
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 80 deletions.
61 changes: 8 additions & 53 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package agent

import (
"fmt"
"github.com/VividCortex/gohistogram"
"log"
"os"
"runtime"
"strconv"
"sync"
"time"

Expand All @@ -19,16 +17,12 @@ import (
// Agent runs telegraf and collects data based on the given config
type Agent struct {
Config *config.Config
fieldMap map[string]map[string]*gohistogram.NumericHistogram
metricTags map[string]map[string]string
}

// NewAgent returns an Agent struct based off the given Config
func NewAgent(config *config.Config) (*Agent, error) {
a := &Agent{
Config: config,
fieldMap: make(map[string]map[string]*gohistogram.NumericHistogram),
metricTags: make(map[string]map[string]string),
}

if !a.Config.Agent.OmitHostname {
Expand Down Expand Up @@ -241,16 +235,20 @@ func (a *Agent) flush() {
for _, o := range a.Config.Outputs {
go func(output *internal_models.RunningOutput) {
defer wg.Done()
a.AddHistMetricToOutput(output)
if a.Config.Filter != nil {
a.Config.Filter.OutputMetric(output)
}
err := output.Write()
if err != nil {
log.Printf("Error writing to output [%s]: %s\n",
output.Name, err.Error())
}
}(o)
}

wg.Wait()
if a.Config.Filter != nil {
a.Config.Filter.Reset()
}
}

// flusher monitors the metrics input channel and flushes on the minimum interval
Expand All @@ -271,8 +269,8 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
case m := <-metricC:
if _, ok := a.Config.Agent.Histogram[m.Name()]; ok {
a.AddMetric(m)
if a.Config.Filter != nil && a.Config.Filter.IsEnabled(m.Name()) {
a.Config.Filter.AddMetric(m)
} else {
for _, o := range a.Config.Outputs {
o.AddMetric(m)
Expand All @@ -282,49 +280,6 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
}
}

func (a *Agent) AddMetric(metric telegraf.Metric) {
name := metric.Name()
if a.fieldMap[name] == nil {
a.fieldMap[name] = make(map[string]*gohistogram.NumericHistogram)
}
if a.metricTags[name] == nil {
a.metricTags[name] = make(map[string]string)
}
a.metricTags[name] = metric.Tags()
for key, val := range metric.Fields() {
switch v := val.(type) {
case float64:
if a.fieldMap[name][key] == nil {
a.fieldMap[name][key] = gohistogram.NewHistogram(a.Config.Agent.HistogramBuckSize)
}
hist := a.fieldMap[name][key]
hist.Add(v)
default:
log.Printf("When stats enabled all the fields should be of type float64 [field name %s]", key)
}
}
}

func (a *Agent) AddHistMetricToOutput(output *internal_models.RunningOutput) {
for name, fields := range a.fieldMap {
mFields := make(map[string]interface{})
for key, val := range fields {
for _, perc := range a.Config.Agent.Histogram[name] {
p := strconv.FormatFloat(perc*100, 'f', 0, 64)
mFields[key+"_p"+p] = val.Quantile(perc)
}
mFields[key+"_variance"] = val.Variance()
mFields[key+"_mean"] = val.Mean()
mFields[key+"_count"] = val.Count()
}
m, _ := telegraf.NewMetric(name, a.metricTags[name], mFields, time.Now().UTC())
output.AddMetric(m)

delete(a.fieldMap, name)
delete(a.metricTags, name)
}
}

// Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup
Expand Down
1 change: 1 addition & 0 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
_ "github.com/influxdata/telegraf/plugins/filters/all"
)

var fDebug = flag.Bool("debug", false,
Expand Down
20 changes: 20 additions & 0 deletions filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package telegraf


type Filter interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string

// Description returns a one-sentence description on the Input
Description() string

//Output metric to outputs list
OutputMetric(output interface{})
//Add metric to the middleware
AddMetric(metric Metric)
//Called on each metric to check if this middle ware enabled
//or not for that metric
IsEnabled(name string) bool
//clear metrics to output
Reset()
}
60 changes: 33 additions & 27 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/filters"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/serializers"

Expand Down Expand Up @@ -49,6 +50,7 @@ type Config struct {
Agent *AgentConfig
Inputs []*internal_models.RunningInput
Outputs []*internal_models.RunningOutput
Filter telegraf.Filter
}

func NewConfig() *Config {
Expand All @@ -58,7 +60,6 @@ func NewConfig() *Config {
Interval: internal.Duration{Duration: 10 * time.Second},
RoundInterval: true,
FlushInterval: internal.Duration{Duration: 10 * time.Second},
HistogramBuckSize: 20,
},

Tags: make(map[string]string),
Expand Down Expand Up @@ -123,14 +124,6 @@ type AgentConfig struct {
Hostname string
OmitHostname bool

// Supported Histogram method
// (metric_name) = list of percintil (0.95, 0.39)
// Note if Histogram is enabled for a metric All field will be
// Sampled
Histogram map[string][]float64

// Histogram bucketsize
HistogramBuckSize int
}

// Inputs returns a list of strings of the configured inputs.
Expand Down Expand Up @@ -227,24 +220,6 @@ var header = `# Telegraf Configuration
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
## Supported Histogram method
## (metric_name) = list of percintile (0.95, 0.39)
## Note if Histogram is enabled for a metric. All field will
## be Sampled
## value generated are approxmation please refer to
## Ben-Haim & Yom-Tov's A Streaming Parallel Decision Tree Algorithm
## http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
##[agent.histogram]
## (metric_name) = (percintile list ex [0.95, 0.50])
## Empty array will generate count median and variance
## (metric_name) = []
##Histogram bucket size
##A larger bin size yields more accurate approximations at the
##cost of increased memory utilization and performance
##histogram_buck_size = 20
###############################################################################
Expand Down Expand Up @@ -544,6 +519,21 @@ func (c *Config) LoadConfig(path string) error {
pluginName, path)
}
}
case "filter":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case *ast.Table:
if err = c.addFilter(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
case []*ast.Table:
return fmt.Errorf("You can only specify one filter.", path, err)
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
}
}

// Assume it's an input input for legacy config file support if no other
// identifiers are present
default:
Expand Down Expand Up @@ -611,6 +601,22 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
return nil
}

func (c *Config) addFilter(name string, table *ast.Table) error {
fmt.Printf("%v", filters.Filters)
creator, ok := filters.Filters[name]
if !ok {
return fmt.Errorf("Undefined but requested filter: %s", name)
}
filter := creator()

if err := config.UnmarshalTable(table, filter); err != nil {
return err
}

c.Filter = filter
return nil
}

func (c *Config) addInput(name string, table *ast.Table) error {
if len(c.InputFilters) > 0 && !sliceContains(name, c.InputFilters) {
return nil
Expand Down
5 changes: 5 additions & 0 deletions plugins/filters/all/all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package all

import (
_ "github.com/influxdata/telegraf/plugins/filters/histogram"
)
110 changes: 110 additions & 0 deletions plugins/filters/histogram/histogram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package histogram

import (
"github.com/VividCortex/gohistogram"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/filters"
"github.com/influxdata/telegraf/internal/models"
"strconv"
"log"
"sync"
"time"
)

type Histogram struct {
sync.RWMutex
Bucketsize int
Metrics map[string][]float64
fieldMap map[string]map[string]*gohistogram.NumericHistogram
metricTags map[string]map[string]string
}

func (h *Histogram) Description() string {
return "Read Histogram-formatted JSON metrics from one or more HTTP endpoints"
}

func (h *Histogram) SampleConfig() string {
return `
## Histogram Filter
## This filter can be used to generate
## (mean varince percentile count)
## values generated are approxmation please refer to
## Ben-Haim & Yom-Tov's A Streaming Parallel Decision Tree Algorithm
## http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
##[filter.histogram]
## bucket size if increase it will increase accuracy
## but it will increase memory usage
## bucketsize = 20
## [filter.histogram.metrics]
## metric name = [percentiles] ## if array is empty only count mean
## and variance will be cacluated
## tail = [0.90]
`
}

func (h *Histogram) AddMetric(metric telegraf.Metric) {
name := metric.Name()
h.Lock()
defer h.Unlock()
if h.fieldMap[name] == nil {
h.fieldMap[name] = make(map[string]*gohistogram.NumericHistogram)
}
if h.metricTags[name] == nil {
h.metricTags[name] = make(map[string]string)
}
h.metricTags[name] = metric.Tags()
for key, val := range metric.Fields() {
switch v := val.(type) {
case float64:
if h.fieldMap[name][key] == nil {
h.fieldMap[name][key] = gohistogram.NewHistogram(h.Bucketsize)
}
hist := h.fieldMap[name][key]
hist.Add(v)
default:
log.Printf("When stats enabled all the fields should be of type float64 [field name %s]", key)
}
}
}

func (h *Histogram) IsEnabled(name string) bool {
_, ok := h.Metrics[name]
return ok
}

func (h *Histogram) OutputMetric(output interface{}) {
h.RLock()
defer h.RUnlock()
for name, fields := range h.fieldMap {
mFields := make(map[string]interface{})
for key, val := range fields {
for _, perc := range h.Metrics[name] {
p := strconv.FormatFloat(perc*100, 'f', 0, 64)
mFields[key+"_p"+p] = val.Quantile(perc)
}
mFields[key+"_variance"] = val.Variance()
mFields[key+"_mean"] = val.Mean()
mFields[key+"_count"] = val.Count()
}
metric, _ := telegraf.NewMetric(name, h.metricTags[name], mFields, time.Now().UTC())
if out, ok := output.(*internal_models.RunningOutput); ok {
out.AddMetric(metric)
}
}
}

func (h *Histogram) Reset() {
h.Lock()
defer h.Unlock()
h.fieldMap = make(map[string]map[string]*gohistogram.NumericHistogram)
h.metricTags = make(map[string]map[string]string)
}

func init() {
filters.Add("histogram", func() telegraf.Filter {
return &Histogram{
fieldMap: make(map[string]map[string]*gohistogram.NumericHistogram),
metricTags: make(map[string]map[string]string),
}
})
}
11 changes: 11 additions & 0 deletions plugins/filters/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package filters

import "github.com/influxdata/telegraf"

type Creator func() telegraf.Filter

var Filters = map[string]Creator{}

func Add(name string, creator Creator) {
Filters[name] = creator
}

0 comments on commit d2390e9

Please sign in to comment.