Skip to content

Commit

Permalink
Add call to optional Init function for all plugins (influxdata#5899)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and idohalevi committed Sep 23, 2020
1 parent a650c2a commit 62c082c
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 38 deletions.
46 changes: 45 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,14 @@ func (a *Agent) Run(ctx context.Context) error {
return ctx.Err()
}

log.Printf("D! [agent] Initializing plugins")
err := a.initPlugins()
if err != nil {
return err
}

log.Printf("D! [agent] Connecting outputs")
err := a.connectOutputs(ctx)
err = a.connectOutputs(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -185,6 +191,11 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
}

for _, input := range a.Config.Inputs {
err := input.Init()
if err != nil {
return err
}

select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -596,6 +607,39 @@ func (a *Agent) flushOnce(

}

// initPlugins runs the Init function on plugins.
func (a *Agent) initPlugins() error {
for _, input := range a.Config.Inputs {
err := input.Init()
if err != nil {
return fmt.Errorf("could not initialize input %s: %v",
input.Config.Name, err)
}
}
for _, processor := range a.Config.Processors {
err := processor.Init()
if err != nil {
return fmt.Errorf("could not initialize processor %s: %v",
processor.Config.Name, err)
}
}
for _, aggregator := range a.Config.Aggregators {
err := aggregator.Init()
if err != nil {
return fmt.Errorf("could not initialize aggregator %s: %v",
aggregator.Config.Name, err)
}
}
for _, output := range a.Config.Outputs {
err := output.Init()
if err != nil {
return fmt.Errorf("could not initialize output %s: %v",
output.Config.Name, err)
}
}
return nil
}

// connectOutputs connects to all outputs.
func (a *Agent) connectOutputs(ctx context.Context) error {
for _, output := range a.Config.Outputs {
Expand Down
4 changes: 4 additions & 0 deletions docs/AGGREGATORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ var sampleConfig = `
drop_original = false
`

func (m *Min) Init() error {
return nil
}

func (m *Min) SampleConfig() string {
return sampleConfig
}
Expand Down
4 changes: 4 additions & 0 deletions docs/INPUTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (s *Simple) SampleConfig() string {
`
}

func (s *Simple) Init() error {
return nil
}

func (s *Simple) Gather(acc telegraf.Accumulator) error {
if s.Ok {
acc.AddFields("state", map[string]interface{}{"value": "pretty good"}, nil)
Expand Down
4 changes: 4 additions & 0 deletions docs/OUTPUTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (s *Simple) SampleConfig() string {
`
}

func (s *Simple) Init() error {
return nil
}

func (s *Simple) Connect() error {
// Make a connection to the URL here
return nil
Expand Down
4 changes: 4 additions & 0 deletions docs/PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (p *Printer) Description() string {
return "Print all metrics that pass through this filter."
}

func (p *Printer) Init() error {
return nil
}

func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric {
for _, metric := range in {
fmt.Println(metric.String())
Expand Down
9 changes: 9 additions & 0 deletions input.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
package telegraf

// Initializer is an interface that all plugin types: Inputs, Outputs,
// Processors, and Aggregators can optionally implement to initialize the
// plugin.
type Initializer interface {
// Init performs one time setup of the plugin and returns an error if the
// configuration is invalid.
Init() error
}

type Input interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string
Expand Down
10 changes: 10 additions & 0 deletions internal/models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ func (r *RunningAggregator) Name() string {
return "aggregators." + r.Config.Name
}

func (r *RunningAggregator) Init() error {
if p, ok := r.Aggregator.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}

func (r *RunningAggregator) Period() time.Duration {
return r.Config.Period
}
Expand Down
10 changes: 10 additions & 0 deletions internal/models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ func (r *RunningInput) metricFiltered(metric telegraf.Metric) {
metric.Drop()
}

func (r *RunningInput) Init() error {
if p, ok := r.Input.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}

func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric {
if ok := r.Config.Filter.Select(metric); !ok {
r.metricFiltered(metric)
Expand Down
10 changes: 10 additions & 0 deletions internal/models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ func (ro *RunningOutput) metricFiltered(metric telegraf.Metric) {
metric.Drop()
}

func (ro *RunningOutput) Init() error {
if p, ok := ro.Output.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}

// AddMetric adds a metric to the output.
//
// Takes ownership of metric
Expand Down
10 changes: 10 additions & 0 deletions internal/models/running_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ func containsMetric(item telegraf.Metric, metrics []telegraf.Metric) bool {
return false
}

func (rp *RunningProcessor) Init() error {
if p, ok := rp.Processor.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}

func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
rp.Lock()
defer rp.Unlock()
Expand Down
33 changes: 15 additions & 18 deletions plugins/inputs/http/http.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package http

import (
"errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -89,27 +88,25 @@ func (*HTTP) Description() string {
return "Read formatted metrics from one or more HTTP endpoints"
}

// Gather takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval"
func (h *HTTP) Gather(acc telegraf.Accumulator) error {
if h.parser == nil {
return errors.New("Parser is not set")
func (h *HTTP) Init() error {
tlsCfg, err := h.ClientConfig.TLSConfig()
if err != nil {
return err
}

if h.client == nil {
tlsCfg, err := h.ClientConfig.TLSConfig()
if err != nil {
return err
}
h.client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: http.ProxyFromEnvironment,
},
Timeout: h.Timeout.Duration,
}
h.client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: http.ProxyFromEnvironment,
},
Timeout: h.Timeout.Duration,
}
return nil
}

// Gather takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval"
func (h *HTTP) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
for _, u := range h.URLs {
wg.Add(1)
Expand Down
24 changes: 5 additions & 19 deletions plugins/inputs/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestHTTPwithJSONFormat(t *testing.T) {
plugin.SetParser(p)

var acc testutil.Accumulator
plugin.Init()
require.NoError(t, acc.GatherError(plugin.Gather))

require.Len(t, acc.Metrics, 1)
Expand Down Expand Up @@ -78,6 +79,7 @@ func TestHTTPHeaders(t *testing.T) {
plugin.SetParser(p)

var acc testutil.Accumulator
plugin.Init()
require.NoError(t, acc.GatherError(plugin.Gather))
}

Expand All @@ -100,6 +102,7 @@ func TestInvalidStatusCode(t *testing.T) {
plugin.SetParser(p)

var acc testutil.Accumulator
plugin.Init()
require.Error(t, acc.GatherError(plugin.Gather))
}

Expand All @@ -125,28 +128,10 @@ func TestMethod(t *testing.T) {
plugin.SetParser(p)

var acc testutil.Accumulator
plugin.Init()
require.NoError(t, acc.GatherError(plugin.Gather))
}

func TestParserNotSet(t *testing.T) {
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/endpoint" {
_, _ = w.Write([]byte(simpleJSON))
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer fakeServer.Close()

url := fakeServer.URL + "/endpoint"
plugin := &plugin.HTTP{
URLs: []string{url},
}

var acc testutil.Accumulator
require.Error(t, acc.GatherError(plugin.Gather))
}

const simpleJSON = `
{
"a": 1.2
Expand Down Expand Up @@ -237,6 +222,7 @@ func TestBodyAndContentEncoding(t *testing.T) {
tt.plugin.SetParser(parser)

var acc testutil.Accumulator
tt.plugin.Init()
err = tt.plugin.Gather(&acc)
require.NoError(t, err)
})
Expand Down

0 comments on commit 62c082c

Please sign in to comment.