Skip to content

Commit

Permalink
Add Histogram support aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
Ali Alrahahleh committed Aug 5, 2016
1 parent 06cb5a0 commit 4572d05
Show file tree
Hide file tree
Showing 10 changed files with 656 additions and 12 deletions.
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9
github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc
github.com/VividCortex/gohistogram da38b6e56f2f7dc1999a037141441e50d6213f5d
github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687
github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857
github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4
Expand Down
21 changes: 18 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,28 +242,43 @@ func (a *Agent) flush() {
}
}(o)
}

wg.Wait()
}

// flusher monitors the metrics input channel and flushes on the minimum interval
func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error {
var metricStream chan telegraf.Metric
stopFilter := make(chan struct{}, len(a.Config.Filters))
// Inelegant, but this sleep is to allow the Gather threads to run, so that
// the flusher will flush after metrics are collected.
time.Sleep(time.Millisecond * 200)

ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
metricStream = metricC
if len(a.Config.Filters) != 0 {
for _, name := range a.Config.FiltersOrder {
filter := a.Config.Filters[name]
log.Printf("Filter %s is enabled", name)
metricStream = filter.Pipe(metricStream)
go func(f telegraf.Filter) {
f.Start(stopFilter)
}(filter)
}
}

for {
select {
case <-shutdown:
//sending shutdown signal for all filters
for range a.Config.Filters {
stopFilter <- struct{}{}
}
log.Println("Hang on, flushing any cached metrics before shutdown")
a.flush()
return nil
case <-ticker.C:
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
case m := <-metricC:
case m := <-metricStream:
for _, o := range a.Config.Outputs {
o.AddMetric(m)
}
Expand Down
25 changes: 23 additions & 2 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/plugins/filters"
_ "github.com/influxdata/telegraf/plugins/filters/all"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
Expand All @@ -31,12 +33,16 @@ var fSampleConfig = flag.Bool("sample-config", false,
var fPidfile = flag.String("pidfile", "", "file to write our pid to")
var fInputFilters = flag.String("input-filter", "",
"filter the inputs to enable, separator is :")
var fFilterFilters = flag.String("filter-filter", "",
"filter to be enabled, separator is :")
var fInputList = flag.Bool("input-list", false,
"print available input plugins.")
var fOutputFilters = flag.String("output-filter", "",
"filter the outputs to enable, separator is :")
var fOutputList = flag.Bool("output-list", false,
"print available output plugins.")
var fFilterList = flag.Bool("filter-list", false,
"print available filter plugins.")
var fUsage = flag.String("usage", "",
"print usage for a plugin, ie, 'telegraf -usage mysql'")
var fInputFiltersLegacy = flag.String("filter", "",
Expand Down Expand Up @@ -70,6 +76,8 @@ The flags are:
-input-list print all the plugins inputs
-output-filter filter the output plugins to enable, separator is :
-output-list print all the available outputs
-filter-filter filter the filter plugins to enable, separator is :
-filter-list print all the available filters
-usage print usage for a plugin, ie, 'telegraf -usage mysql'
-debug print metrics as they're generated to stdout
-quiet run in quiet mode
Expand Down Expand Up @@ -121,6 +129,11 @@ func main() {
inputFilters = strings.Split(":"+inputFilter+":", ":")
}

var filterFilters []string
if *fFilterFilters != "" {
filterFilters = strings.Split(":"+strings.TrimSpace(*fFilterFilters)+":", ":")
}

var outputFilters []string
if *fOutputFiltersLegacy != "" {
fmt.Printf("WARNING '--outputfilter' flag is deprecated, please use" +
Expand All @@ -140,7 +153,7 @@ func main() {
fmt.Println(v)
return
case "config":
config.PrintSampleConfig(inputFilters, outputFilters)
config.PrintSampleConfig(inputFilters, filterFilters, outputFilters)
return
}
}
Expand All @@ -153,6 +166,14 @@ func main() {
return
}

if *fFilterList {
fmt.Println("Available Filter Plugins:")
for k, _ := range filters.Filters {
fmt.Printf(" %s\n", k)
}
return
}

if *fInputList {
fmt.Println("Available Input Plugins:")
for k, _ := range inputs.Inputs {
Expand All @@ -168,7 +189,7 @@ func main() {
}

if *fSampleConfig {
config.PrintSampleConfig(inputFilters, outputFilters)
config.PrintSampleConfig(inputFilters, filterFilters, outputFilters)
return
}

Expand Down
15 changes: 15 additions & 0 deletions filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package telegraf

type Filter interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string

// Description returns a one-sentence description on the Input
Description() string

//create pipe for filter
Pipe(in chan Metric) chan Metric

// start the filter
Start(shutdown chan struct{})
}
103 changes: 96 additions & 7 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/plugins/filters"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
Expand Down Expand Up @@ -46,9 +47,11 @@ type Config struct {
InputFilters []string
OutputFilters []string

Agent *AgentConfig
Inputs []*internal_models.RunningInput
Outputs []*internal_models.RunningOutput
Agent *AgentConfig
Inputs []*internal_models.RunningInput
Outputs []*internal_models.RunningOutput
Filters map[string]telegraf.Filter
FiltersOrder []string
}

func NewConfig() *Config {
Expand All @@ -63,6 +66,8 @@ func NewConfig() *Config {
Tags: make(map[string]string),
Inputs: make([]*internal_models.RunningInput, 0),
Outputs: make([]*internal_models.RunningOutput, 0),
Filters: make(map[string]telegraf.Filter),
FiltersOrder: make([]string, 0),
InputFilters: make([]string, 0),
OutputFilters: make([]string, 0),
}
Expand All @@ -77,6 +82,14 @@ type AgentConfig struct {
// ie, if Interval=10s then always collect on :00, :10, :20, etc.
RoundInterval bool

// By default, precision will be set to the same timestamp order as the
// collection interval, with the maximum being 1s.
// ie, when interval = "10s", precision will be "1s"
// when interval = "250ms", precision will be "1ms"
// Precision will NOT be used for service inputs. It is up to each individual
// service input to set the timestamp at the appropriate precision.
Precision internal.Duration

// CollectionJitter is used to jitter the collection by a random amount.
// Each plugin will sleep for a random time within jitter before collecting.
// This can be used to avoid many plugins querying things like sysfs at the
Expand Down Expand Up @@ -108,11 +121,10 @@ type AgentConfig struct {
// does _not_ deactivate FlushInterval.
FlushBufferWhenFull bool

// TODO(cam): Remove UTC and Precision parameters, they are no longer
// TODO(cam): Remove UTC and parameter, they are no longer
// valid for the agent config. Leaving them here for now for backwards-
// compatability
UTC bool `toml:"utc"`
Precision string
UTC bool `toml:"utc"`

// Debug is the option for running in debug mode
Debug bool
Expand Down Expand Up @@ -209,6 +221,11 @@ var header = `# Telegraf Configuration
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## By default, precision will be set to the same timestamp order as the
## collection interval, with the maximum being 1s.
## Precision will NOT be used for service inputs, such as logparser and statsd.
## Valid values are "Nns", "Nus" (or "Nµs"), "Nms", "Ns".
precision = ""
## Run telegraf in debug mode
debug = false
## Run telegraf in quiet mode
Expand Down Expand Up @@ -239,7 +256,7 @@ var serviceInputHeader = `
`

// PrintSampleConfig prints the sample config
func PrintSampleConfig(inputFilters []string, outputFilters []string) {
func PrintSampleConfig(inputFilters []string, filterFilters []string, outputFilters []string) {
fmt.Printf(header)

if len(outputFilters) != 0 {
Expand All @@ -257,6 +274,17 @@ func PrintSampleConfig(inputFilters []string, outputFilters []string) {
printFilteredOutputs(pnames, true)
}

if len(filterFilters) != 0 {
printFilteredFilters(filterFilters, false)
} else {
var pnames []string
for pname := range filters.Filters {
pnames = append(pnames, pname)
}
sort.Strings(pnames)
printFilteredFilters(pnames, true)
}

fmt.Printf(inputHeader)
if len(inputFilters) != 0 {
printFilteredInputs(inputFilters, false)
Expand Down Expand Up @@ -315,6 +343,20 @@ func printFilteredInputs(inputFilters []string, commented bool) {
}
}

func printFilteredFilters(printOnly []string, commented bool) {
var names []string
for item := range filters.Filters {
if sliceContains(item, printOnly) {
names = append(names, item)
}
}
sort.Strings(names)
for _, name := range names {
creator := filters.Filters[name]
filter := creator()
printConfig(name, filter, "filters", commented)
}
}
func printFilteredOutputs(outputFilters []string, commented bool) {
// Filter outputs
var onames []string
Expand Down Expand Up @@ -516,6 +558,25 @@ func (c *Config) LoadConfig(path string) error {
pluginName, path)
}
}
case "filter":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case *ast.Table:
if err = c.addFilter(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addFilter(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
}
}

// Assume it's an input input for legacy config file support if no other
// identifiers are present
default:
Expand All @@ -527,6 +588,13 @@ func (c *Config) LoadConfig(path string) error {
return nil
}

// trimBOM trims the Byte-Order-Marks from the beginning of the file.
// this is for Windows compatability only.
// see https:/influxdata/telegraf/issues/1378
func trimBOM(f []byte) []byte {
return bytes.TrimPrefix(f, []byte("\xef\xbb\xbf"))
}

// parseFile loads a TOML configuration from a provided path and
// returns the AST produced from the TOML parser. When loading the file, it
// will find environment variables and replace them.
Expand All @@ -535,6 +603,8 @@ func parseFile(fpath string) (*ast.Table, error) {
if err != nil {
return nil, err
}
// ugh windows why
contents = trimBOM(contents)

env_vars := envVarRe.FindAll(contents, -1)
for _, env_var := range env_vars {
Expand Down Expand Up @@ -583,6 +653,25 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
return nil
}

func (c *Config) addFilter(name string, table *ast.Table) error {
creator, ok := filters.Filters[name]
if !ok {
return fmt.Errorf("Undefined but requested filter: %s", name)
}
filter := creator()

if err := config.UnmarshalTable(table, filter); err != nil {
return err
}

if _, ok = c.Filters[name]; ok {
return fmt.Errorf("Filter already defined %s", name)
}
c.Filters[name] = filter
c.FiltersOrder = append(c.FiltersOrder, name)
return nil
}

func (c *Config) addInput(name string, table *ast.Table) error {
if len(c.InputFilters) > 0 && !sliceContains(name, c.InputFilters) {
return nil
Expand Down
5 changes: 5 additions & 0 deletions plugins/filters/all/all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package all

import (
_ "github.com/influxdata/telegraf/plugins/filters/histogram"
)
Loading

0 comments on commit 4572d05

Please sign in to comment.