diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2e60ab14cd6..751d09e06df 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -16,6 +16,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] *Filebeat* +- Fixed error spam from `add_kubernetes_metadata` processor when running on AKS. {pull}33697[33697] +- Metrics hosted by the HTTP monitoring endpoint for the `aws-cloudwatch`, `aws-s3`, `cel`, and `lumberjack` inputs are now available under `/inputs/` instead of `/dataset`. *Heartbeat* diff --git a/NOTICE.txt b/NOTICE.txt index ab5c1b20129..bf18a351f63 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -15758,6 +15758,75 @@ Contents of probable licence file $GOMODCACHE/github.com/gorhill/cronexpr@v0.0.0 limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/gorilla/handlers +Version: v1.5.1 +Licence type (autodetected): BSD-2-Clause +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/gorilla/handlers@v1.5.1/LICENSE: + +Copyright (c) 2013 The Gorilla Handlers Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +-------------------------------------------------------------------------------- +Dependency : github.com/gorilla/mux +Version: v1.8.0 +Licence type (autodetected): BSD-3-Clause +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/gorilla/mux@v1.8.0/LICENSE: + +Copyright (c) 2012-2018 The Gorilla Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + -------------------------------------------------------------------------------- Dependency : github.com/h2non/filetype Version: v1.1.1 @@ -33082,6 +33151,35 @@ Contents of probable licence file $GOMODCACHE/github.com/fearful-symmetry/gomsr@ limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/felixge/httpsnoop +Version: v1.0.1 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/felixge/httpsnoop@v1.0.1/LICENSE.txt: + +Copyright (c) 2016 Felix Geisendörfer (felix@debuggable.com) + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. + + -------------------------------------------------------------------------------- Dependency : github.com/form3tech-oss/jwt-go Version: v3.2.3+incompatible @@ -35765,43 +35863,6 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. --------------------------------------------------------------------------------- -Dependency : github.com/gorilla/mux -Version: v1.8.0 -Licence type (autodetected): BSD-3-Clause --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/gorilla/mux@v1.8.0/LICENSE: - -Copyright (c) 2012-2018 The Gorilla Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -------------------------------------------------------------------------------- Dependency : github.com/gorilla/websocket Version: v1.4.2 diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 765f0eea2c1..0807d8e7958 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -39,6 +39,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" "github.com/elastic/beats/v7/libbeat/statestore" @@ -117,6 +118,12 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea return nil, err } + if b.API != nil { + if err = inputmon.AttachHandler(b.API.Router()); err != nil { + return nil, fmt.Errorf("failed attach inputs api to monitoring endpoint server: %w", err) + } + } + // Add inputs created by the modules config.Inputs = append(config.Inputs, moduleInputs...) diff --git a/filebeat/docs/index.asciidoc b/filebeat/docs/index.asciidoc index aabcc2b9cf4..4399fc17e28 100644 --- a/filebeat/docs/index.asciidoc +++ b/filebeat/docs/index.asciidoc @@ -23,6 +23,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[] :has_kubernetes_logs_path_matcher: :has_nomad_logs_path_matcher: :has_registry: +:has_inputs_endpoint: :deb_os: :rpm_os: :mac_os: diff --git a/go.mod b/go.mod index e9405c2e4b9..43f21dfe3f5 100644 --- a/go.mod +++ b/go.mod @@ -201,6 +201,8 @@ require ( github.com/elastic/toutoumomoma v0.0.0-20221026030040-594ef30cb640 github.com/google/cel-go v0.12.5 github.com/googleapis/gax-go/v2 v2.5.1 + github.com/gorilla/handlers v1.5.1 + github.com/gorilla/mux v1.8.0 github.com/pierrec/lz4/v4 v4.1.15 github.com/shirou/gopsutil/v3 v3.21.12 go.elastic.co/apm/module/apmelasticsearch/v2 v2.0.0 @@ -257,6 +259,7 @@ require ( github.com/elastic/go-windows v1.0.1 // indirect github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/fearful-symmetry/gomsr v0.0.1 // indirect + github.com/felixge/httpsnoop v1.0.1 // indirect github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/go-logr/logr v1.2.3 // indirect diff --git a/go.sum b/go.sum index 447cf675a89..a38b1debf8d 100644 --- a/go.sum +++ b/go.sum @@ -704,6 +704,8 @@ github.com/fearful-symmetry/gomsr v0.0.1 h1:m208RzdTApWVbv8a9kf78rdPLQe+BY9AxRb/ github.com/fearful-symmetry/gomsr v0.0.1/go.mod h1:Qb/0Y7zwobP7v8Sji+M5mlL4N7Voyz5WaKXXRFPnLio= github.com/fearful-symmetry/gorapl v0.0.4 h1:TMn4fhhtIAd+C3NrAl638oaYlX1vgcKNVVdad53oyiE= github.com/fearful-symmetry/gorapl v0.0.4/go.mod h1:XoeZ+5v0tJX9WMvzqdPaaKAdX7y17mDN3pxDGemINR0= +github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= +github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c= @@ -1049,6 +1051,8 @@ github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 h1:f0n1xnMSmBLzVf github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ= +github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH4= +github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= diff --git a/libbeat/api/config.go b/libbeat/api/config.go index f6e57551b36..acd74ef4177 100644 --- a/libbeat/api/config.go +++ b/libbeat/api/config.go @@ -28,14 +28,12 @@ type Config struct { SecurityDescriptor string `config:"named_pipe.security_descriptor"` } -var ( - // DefaultConfig is the default configuration used by the API endpoint. - DefaultConfig = Config{ - Enabled: false, - Host: "localhost", - Port: 5066, - } -) +// DefaultConfig is the default configuration used by the API endpoint. +var DefaultConfig = Config{ + Enabled: false, + Host: "localhost", + Port: 5066, +} // File mode for the socket file, owner of the process can do everything, member of the group can read. -const socketFileMode = os.FileMode(0740) +const socketFileMode = os.FileMode(0o740) diff --git a/libbeat/api/routes.go b/libbeat/api/routes.go index 0cc06cde153..fd0bcc8ca41 100644 --- a/libbeat/api/routes.go +++ b/libbeat/api/routes.go @@ -22,33 +22,37 @@ import ( "net/http" "net/url" + "go.uber.org/multierr" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" ) -type handlerFunc func(http.ResponseWriter, *http.Request) type lookupFunc func(string) *monitoring.Namespace -var handlerFuncMap = make(map[string]handlerFunc) - // NewWithDefaultRoutes creates a new server with default API routes. func NewWithDefaultRoutes(log *logp.Logger, config *config.C, ns lookupFunc) (*Server, error) { - mux := http.NewServeMux() - - mux.HandleFunc("/", makeRootAPIHandler(makeAPIHandler(ns("info")))) - mux.HandleFunc("/state", makeAPIHandler(ns("state"))) - mux.HandleFunc("/stats", makeAPIHandler(ns("stats"))) - mux.HandleFunc("/dataset", makeAPIHandler(ns("dataset"))) + api, err := New(log, config) + if err != nil { + return nil, err + } - for api, h := range handlerFuncMap { - mux.HandleFunc(api, h) + err = multierr.Combine( + api.AttachHandler("/", makeRootAPIHandler(makeAPIHandler(ns("info")))), + api.AttachHandler("/state", makeAPIHandler(ns("state"))), + api.AttachHandler("/stats", makeAPIHandler(ns("stats"))), + api.AttachHandler("/dataset", makeAPIHandler(ns("dataset"))), + ) + if err != nil { + return nil, err } - return New(log, mux, config) + + return api, nil } -func makeRootAPIHandler(handler handlerFunc) handlerFunc { +func makeRootAPIHandler(handler http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/" { http.NotFound(w, r) @@ -58,7 +62,7 @@ func makeRootAPIHandler(handler handlerFunc) handlerFunc { } } -func makeAPIHandler(ns *monitoring.Namespace) handlerFunc { +func makeAPIHandler(ns *monitoring.Namespace) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=utf-8") @@ -80,12 +84,3 @@ func prettyPrint(w http.ResponseWriter, data mapstr.M, u *url.URL) { fmt.Fprint(w, data.String()) } } - -// AddHandlerFunc provides interface to add customized handlerFunc -func AddHandlerFunc(api string, h handlerFunc) error { - if _, exist := handlerFuncMap[api]; exist { - return fmt.Errorf("%s already exist", api) - } - handlerFuncMap[api] = h - return nil -} diff --git a/libbeat/api/server.go b/libbeat/api/server.go index 4e7396e4f16..561ab47457d 100644 --- a/libbeat/api/server.go +++ b/libbeat/api/server.go @@ -18,13 +18,14 @@ package api import ( - "errors" "fmt" "net" "net/http" "net/url" "strconv" + "github.com/gorilla/mux" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -33,13 +34,13 @@ import ( // and will answer all the routes defined in the received ServeMux. type Server struct { log *logp.Logger - mux *http.ServeMux + mux *mux.Router l net.Listener config Config } -// New creates a new API Server. -func New(log *logp.Logger, mux *http.ServeMux, config *config.C) (*Server, error) { +// New creates a new API Server with no routes attached. +func New(log *logp.Logger, config *config.C) (*Server, error) { if log == nil { log = logp.NewLogger("") } @@ -55,7 +56,12 @@ func New(log *logp.Logger, mux *http.ServeMux, config *config.C) (*Server, error return nil, err } - return &Server{mux: mux, l: l, config: cfg, log: log.Named("api")}, nil + return &Server{ + mux: mux.NewRouter().StrictSlash(true), + l: l, + config: cfg, + log: log.Named("api"), + }, nil } // Start starts the HTTP server and accepting new connection. @@ -73,25 +79,21 @@ func (s *Server) Stop() error { return s.l.Close() } -// AttachHandler will attach a handler at the specified route and return an error instead of panicing. +// AttachHandler will attach a handler at the specified route. Routes are +// matched in the order in which that are attached. func (s *Server) AttachHandler(route string, h http.Handler) (err error) { - defer func() { - if r := recover(); r != nil { - switch r := r.(type) { - case error: - err = r - case string: - err = errors.New(r) - default: - err = fmt.Errorf("failed to register http handler at path %v: %v", route, h) - } - } - }() - s.mux.Handle(route, h) + if err := s.mux.Handle(route, h).GetError(); err != nil { + return err + } s.log.Debugf("Attached handler at %q to server.", route) return nil } +// Router returns the mux.Router that handles all request to the server. +func (s *Server) Router() *mux.Router { + return s.mux +} + func parse(host string, port int) (string, string, error) { url, err := url.Parse(host) if err != nil { diff --git a/libbeat/api/server_test.go b/libbeat/api/server_test.go index 972fafc04e5..8ba0e655e39 100644 --- a/libbeat/api/server_test.go +++ b/libbeat/api/server_test.go @@ -19,11 +19,10 @@ package api import ( "context" - "fmt" "io" - "io/ioutil" "net" "net/http" + "net/http/httptest" "os" "runtime" "testing" @@ -45,8 +44,8 @@ func TestConfiguration(t *testing.T) { "user": "admin", }) - _, err := New(nil, simpleMux(), cfg) - assert.Equal(t, err == nil, false) + _, err := New(nil, cfg) + require.Error(t, err) }) t.Run("when security descriptor is set", func(t *testing.T) { @@ -55,8 +54,8 @@ func TestConfiguration(t *testing.T) { "security_descriptor": "D:P(A;;GA;;;1234)", }) - _, err := New(nil, simpleMux(), cfg) - assert.Equal(t, err == nil, false) + _, err := New(nil, cfg) + require.Error(t, err) }) } @@ -77,21 +76,23 @@ func TestSocket(t *testing.T) { } t.Run("socket doesn't exist before", func(t *testing.T) { - tmpDir, err := ioutil.TempDir("", "testsocket") + tmpDir, err := os.MkdirTemp("", "testsocket") require.NoError(t, err) defer os.RemoveAll(tmpDir) sockFile := tmpDir + "/test.sock" + t.Log(sockFile) cfg := config.MustNewConfigFrom(map[string]interface{}{ "host": "unix://" + sockFile, }) - s, err := New(nil, simpleMux(), cfg) + s, err := New(nil, cfg) require.NoError(t, err) + attachEchoHelloHandler(t, s) go s.Start() defer func() { - s.Stop() + require.NoError(t, s.Stop()) // Make we cleanup behind _, err := os.Stat(sockFile) require.Error(t, err) @@ -104,16 +105,17 @@ func TestSocket(t *testing.T) { require.NoError(t, err) defer r.Body.Close() - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) require.NoError(t, err) assert.Equal(t, "ehlo!", string(body)) fi, err := os.Stat(sockFile) + require.NoError(t, err) assert.Equal(t, socketFileMode, fi.Mode().Perm()) }) t.Run("starting beat and recover a dangling socket file", func(t *testing.T) { - tmpDir, err := ioutil.TempDir("", "testsocket") + tmpDir, err := os.MkdirTemp("", "testsocket") require.NoError(t, err) defer os.RemoveAll(tmpDir) @@ -128,11 +130,12 @@ func TestSocket(t *testing.T) { "host": "unix://" + sockFile, }) - s, err := New(nil, simpleMux(), cfg) + s, err := New(nil, cfg) require.NoError(t, err) + attachEchoHelloHandler(t, s) go s.Start() defer func() { - s.Stop() + require.NoError(t, s.Stop()) // Make we cleanup behind _, err := os.Stat(sockFile) require.Error(t, err) @@ -145,12 +148,13 @@ func TestSocket(t *testing.T) { require.NoError(t, err) defer r.Body.Close() - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) require.NoError(t, err) assert.Equal(t, "ehlo!", string(body)) fi, err := os.Stat(sockFile) + require.NoError(t, err) assert.Equal(t, socketFileMode, fi.Mode().Perm(), "incorrect mode for file %s", sockFile) }) } @@ -163,61 +167,59 @@ func TestHTTP(t *testing.T) { "host": url, }) - s, err := New(nil, simpleMux(), cfg) + s, err := New(nil, cfg) require.NoError(t, err) + attachEchoHelloHandler(t, s) go s.Start() - defer s.Stop() + defer func() { + require.NoError(t, s.Stop()) + }() r, err := http.Get("http://" + s.l.Addr().String() + "/echo-hello") require.NoError(t, err) defer r.Body.Close() - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) require.NoError(t, err) assert.Equal(t, "ehlo!", string(body)) } -func simpleMux() *http.ServeMux { - mux := http.NewServeMux() - mux.HandleFunc("/echo-hello", func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "ehlo!") - }) - return mux +func attachEchoHelloHandler(t *testing.T, s *Server) { + t.Helper() + + if err := s.AttachHandler("/echo-hello", newTestHandler("ehlo!")); err != nil { + t.Fatal(err) + } } func TestAttachHandler(t *testing.T) { - url := "http://localhost:0" - cfg := config.MustNewConfigFrom(map[string]interface{}{ - "host": url, + "host": "http://localhost:0", }) - s, err := New(nil, simpleMux(), cfg) + s, err := New(nil, cfg) require.NoError(t, err) - go s.Start() - defer s.Stop() - h := &testHandler{} + req := httptest.NewRequest(http.MethodGet, "http://"+s.l.Addr().String()+"/test", nil) - err = s.AttachHandler("/test", h) + // Test the first handler is attached. + err = s.AttachHandler("/test", newTestHandler("test!")) require.NoError(t, err) + resp := httptest.NewRecorder() + s.mux.ServeHTTP(resp, req) + assert.Equal(t, "test!", resp.Body.String()) - r, err := http.Get("http://" + s.l.Addr().String() + "/test") - require.NoError(t, err) - defer r.Body.Close() - - body, err := io.ReadAll(r.Body) + // Handlers are matched in order so the first one will take precedence. + err = s.AttachHandler("/test", newTestHandler("NOT test!")) require.NoError(t, err) - - assert.Equal(t, "test!", string(body)) - - err = s.AttachHandler("/test", h) - assert.NotNil(t, err) + resp = httptest.NewRecorder() + s.mux.ServeHTTP(resp, req) + assert.Equal(t, "test!", resp.Body.String()) } -type testHandler struct{} - -func (t *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "test!") +func newTestHandler(response string) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = io.WriteString(w, response) + }) } diff --git a/libbeat/api/server_windows_test.go b/libbeat/api/server_windows_test.go index 2c3885530cd..243d681f775 100644 --- a/libbeat/api/server_windows_test.go +++ b/libbeat/api/server_windows_test.go @@ -39,10 +39,13 @@ func TestNamedPipe(t *testing.T) { "host": p, }) - s, err := New(nil, simpleMux(), cfg) + s, err := New(nil, cfg) require.NoError(t, err) + attachEchoHelloHandler(t, s) go s.Start() - defer s.Stop() + defer func() { + require.NoError(t, s.Stop()) + }() c := http.Client{ Transport: &http.Transport{ diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index 5cf0c932ca8..001d602c46f 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -18,6 +18,7 @@ package beat import ( + "github.com/elastic/beats/v7/libbeat/api" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/management" @@ -79,6 +80,8 @@ type Beat struct { Keystore keystore.Keystore Instrumentation instrumentation.Instrumentation // instrumentation holds an APM agent for capturing and reporting traces + + API *api.Server // API server. This is nil unless the http endpoint is enabled. } // BeatConfig struct contains the basic configuration of every beat diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 2c42fda05e1..7dfe3393f89 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -404,20 +404,19 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { // Start the API Server before the Seccomp lock down, we do this so we can create the unix socket // set the appropriate permission on the unix domain file without having to whitelist anything // that would be set at runtime. - var s *api.Server // buffer reporter may need to attach to the server. if b.Config.HTTP.Enabled() { - s, err = api.NewWithDefaultRoutes(logp.NewLogger(""), b.Config.HTTP, monitoring.GetNamespace) + b.API, err = api.NewWithDefaultRoutes(logp.NewLogger(""), b.Config.HTTP, monitoring.GetNamespace) if err != nil { return fmt.Errorf("could not start the HTTP server for the API: %w", err) } - s.Start() + b.API.Start() defer func() { - _ = s.Stop() + _ = b.API.Stop() }() if b.Config.HTTPPprof.IsEnabled() { pprof.SetRuntimeProfilingParameters(b.Config.HTTPPprof) - if err := pprof.HttpAttach(b.Config.HTTPPprof, s); err != nil { + if err := pprof.HttpAttach(b.Config.HTTPPprof, b.API); err != nil { return fmt.Errorf("failed to attach http handlers for pprof: %w", err) } } @@ -456,7 +455,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { } defer buffReporter.Stop() - if err := s.AttachHandler("/buffer", buffReporter); err != nil { + if err := b.API.AttachHandler("/buffer", buffReporter); err != nil { return err } } diff --git a/libbeat/docs/http-endpoint.asciidoc b/libbeat/docs/http-endpoint.asciidoc index 61eb37bb06a..f021e9f3276 100644 --- a/libbeat/docs/http-endpoint.asciidoc +++ b/libbeat/docs/http-endpoint.asciidoc @@ -197,3 +197,27 @@ curl -XGET 'localhost:5066/stats?pretty' ---- The actual output may contain more metrics specific to {beatname_uc} + +ifdef::has_inputs_endpoint[] +[float] +=== Inputs + +`/inputs/` returns metrics related to input instances. It returns a list of +objects where each object contains metrics for an instance of an input. Each +object will minimally contain an `input` field that identifies the type of input +(e.g. `aws-s3`) and an `id` field that is the unique identifier for the input +instance. + +A request may optionally specify a `type` query parameter to request metrics +for a specific type of input. And `pretty` may be included to have the +returned JSON be pretty formatted. + +[source,js] +---- +curl 'http://localhost:5066/inputs/' +curl 'http://localhost:5066/inputs/?pretty' +curl 'http://localhost:5066/inputs/?type=aws-s3&pretty' +---- + +["source","js",subs="attributes"] +endif::has_inputs_endpoint[] diff --git a/libbeat/docs/loggingconfig.asciidoc b/libbeat/docs/loggingconfig.asciidoc index c25f74e06a7..e09589ee3e0 100644 --- a/libbeat/docs/loggingconfig.asciidoc +++ b/libbeat/docs/loggingconfig.asciidoc @@ -195,8 +195,8 @@ The period after which to log the internal metrics. The default is 30s. ==== `logging.metrics.namespaces` A list of metrics namespaces to report in the logs. Defaults to `[stats]`. -`stats` contains general Beat metrics. `dataset` may be present in some -Beats and contains module or input metrics. +`stats` contains general Beat metrics. `dataset` and `inputs` may be present in +some Beats and contains module or input metrics. ifndef::serverless[] [float] diff --git a/libbeat/monitoring/inputmon/httphandler.go b/libbeat/monitoring/inputmon/httphandler.go new file mode 100644 index 00000000000..65364592308 --- /dev/null +++ b/libbeat/monitoring/inputmon/httphandler.go @@ -0,0 +1,156 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package inputmon + +import ( + "encoding/json" + "errors" + "net/http" + "strings" + + "github.com/gorilla/handlers" + "github.com/gorilla/mux" + + "github.com/elastic/elastic-agent-libs/monitoring" +) + +const ( + route = "/inputs" + contentType = "Content-Type" + applicationJSON = "application/json; charset=utf-8" +) + +type handler struct { + registry *monitoring.Registry +} + +// AttachHandler attaches an HTTP handler to the given mux.Router to handle +// requests to /inputs. +func AttachHandler(r *mux.Router) error { + return attachHandler(r, globalRegistry()) +} + +func attachHandler(r *mux.Router, registry *monitoring.Registry) error { + h := &handler{registry: registry} + r = r.PathPrefix(route).Subrouter() + return r.StrictSlash(true).Handle("/", validationHandler("GET", []string{"pretty", "type"}, h.allInputs)).GetError() +} + +func (h *handler) allInputs(w http.ResponseWriter, req *http.Request) { + requestedPretty, err := getPretty(req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + requestedType, err := getType(req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + metrics := monitoring.CollectStructSnapshot(h.registry, monitoring.Full, false) + + filtered := make([]map[string]any, 0, len(metrics)) + for _, ifc := range metrics { + m, ok := ifc.(map[string]any) + if !ok { + continue + } + + if requestedType != "" { + if typ, ok := m["input"].(string); ok && !strings.EqualFold(typ, requestedType) { + continue + } + } + + filtered = append(filtered, m) + } + + w.Header().Set(contentType, applicationJSON) + serveJSON(w, filtered, requestedPretty) +} + +func serveJSON(w http.ResponseWriter, value any, pretty bool) { + w.Header().Set(contentType, applicationJSON) + enc := json.NewEncoder(w) + enc.SetEscapeHTML(false) + if pretty { + enc.SetIndent("", " ") + } + _ = enc.Encode(value) +} + +func getPretty(req *http.Request) (bool, error) { + if !req.URL.Query().Has("pretty") { + return false, nil + } + + switch req.URL.Query().Get("pretty") { + case "", "true": + return true, nil + case "false": + return false, nil + default: + return false, errors.New(`invalid value for "pretty"`) + } +} + +func getType(req *http.Request) (string, error) { + if !req.URL.Query().Has("type") { + return "", nil + } + + switch typ := req.URL.Query().Get("type"); typ { + case "": + return "", errors.New(`"type" requires a non-empty value`) + default: + return strings.ToLower(typ), nil + } +} + +type queryParamHandler struct { + allowedParams map[string]struct{} + next http.Handler +} + +func newQueryParamHandler(queryParams []string, h http.Handler) http.Handler { + m := make(map[string]struct{}, len(queryParams)) + for _, q := range queryParams { + m[q] = struct{}{} + } + return &queryParamHandler{allowedParams: m, next: h} +} + +func (h queryParamHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + for q := range req.URL.Query() { + if _, found := h.allowedParams[q]; !found { + http.Error(w, "Unknown query param "+q, http.StatusBadRequest) + return + } + } + h.next.ServeHTTP(w, req) +} + +func validationHandler(method string, queryParams []string, h http.HandlerFunc) http.Handler { + var next http.Handler = h + next = handlers.CompressHandler(next) + next = newQueryParamHandler(queryParams, next) + next = handlers.MethodHandler{method: next} + return next +} diff --git a/libbeat/monitoring/inputmon/httphandler_test.go b/libbeat/monitoring/inputmon/httphandler_test.go new file mode 100644 index 00000000000..26257fc72cb --- /dev/null +++ b/libbeat/monitoring/inputmon/httphandler_test.go @@ -0,0 +1,122 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package inputmon + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "testing" + + "github.com/gorilla/mux" + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/monitoring" +) + +type TestCase struct { + request string + status int + method string + body string +} + +var testCases = []TestCase{ + {request: "/inputs/", status: http.StatusOK, body: `[{"gauge":13344,"id":"123abc","input":"foo"}]`}, + {request: "/inputs", status: http.StatusOK}, + {request: "/inputs/", method: "POST", status: http.StatusMethodNotAllowed}, + {request: "/inputs/?XX", status: http.StatusBadRequest}, + {request: "/inputs/?pretty", status: http.StatusOK}, + {request: "/inputs/?type", status: http.StatusBadRequest}, + {request: "/inputs/?type=udp", status: http.StatusOK, body: `[]`}, + {request: "/inputs/?type=FOO", status: http.StatusOK, body: `[{"gauge":13344,"id":"123abc","input":"foo"}]`}, + {request: "/inputs/XX", status: http.StatusNotFound}, +} + +func TestHandler(t *testing.T) { + parent := monitoring.NewRegistry() + reg, _ := NewInputRegistry("foo", "123abc", parent) + monitoring.NewInt(reg, "gauge").Set(13344) + + r := mux.NewRouter() + s := httptest.NewServer(r) + defer s.Close() + + if err := attachHandler(r, parent); err != nil { + t.Fatal(err) + } + + t.Logf("http://%s", s.Listener.Addr().String()) + + for _, tc := range testCases { + tc := tc + if tc.method == "" { + tc.method = http.MethodGet + } + + t.Run(tc.method+" "+strings.ReplaceAll(tc.request, "/", "_"), func(t *testing.T) { + req, err := http.NewRequestWithContext(context.Background(), tc.method, s.URL+tc.request, nil) + if err != nil { + t.Fatal(err) + } + + resp, err := s.Client().Do(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + + t.Logf("body=%s", body) + if resp.StatusCode != tc.status { + t.Fatalf("bad status code, want=%d, got=%d", tc.status, resp.StatusCode) + } + + if tc.body != "" { + assert.JSONEq(t, tc.body, string(body)) + } + }) + } +} + +func BenchmarkHandlers(b *testing.B) { + reg := monitoring.NewRegistry() + for i := 0; i < 1000; i++ { + reg, _ := NewInputRegistry("foo", "id-"+strconv.Itoa(i), reg) + monitoring.NewInt(reg, "gauge").Set(int64(i)) + } + + h := &handler{registry: reg} + + b.Run("allInputs", func(b *testing.B) { + req := httptest.NewRequest(http.MethodGet, "/inputs/", nil) + resp := httptest.NewRecorder() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + h.allInputs(resp, req) + } + }) +} diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 19a47e3e543..0e88655ba08 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -683,7 +683,7 @@ Will produce the following output: === Metrics This input exposes metrics under the <>. -These metrics are exposed under the `/dataset` path. They can be used to +These metrics are exposed under the `/inputs` path. They can be used to observe the activity of the input. [options="header"] diff --git a/x-pack/filebeat/input/awscloudwatch/input_integration_test.go b/x-pack/filebeat/input/awscloudwatch/input_integration_test.go index 3ccb7bc72be..66ab4b3fe47 100644 --- a/x-pack/filebeat/input/awscloudwatch/input_integration_test.go +++ b/x-pack/filebeat/input/awscloudwatch/input_integration_test.go @@ -190,7 +190,7 @@ func TestInputWithLogGroupNamePrefix(t *testing.T) { } snap := mapstr.M(monitoring.CollectStructSnapshot( - monitoring.GetNamespace("dataset").GetRegistry(), + monitoring.GetNamespace("inputs").GetRegistry(), monitoring.Full, false)) t.Log(snap.StringToPrint()) diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 024caa23ab5..2b3b8c45ee7 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -184,7 +184,7 @@ func TestInputRunSQS(t *testing.T) { drainSQS(t, tfConfig.AWSRegion, tfConfig.QueueURL) // Ensure metrics are removed before testing. - monitoring.GetNamespace("dataset").GetRegistry().Remove(inputID) + monitoring.GetNamespace("inputs").GetRegistry().Remove(inputID) uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketName, "testdata/events-array.json", @@ -225,7 +225,7 @@ func TestInputRunSQS(t *testing.T) { } snap := mapstr.M(monitoring.CollectStructSnapshot( - monitoring.GetNamespace("dataset").GetRegistry(), + monitoring.GetNamespace("inputs").GetRegistry(), monitoring.Full, false)) t.Log(snap.StringToPrint()) @@ -247,7 +247,7 @@ func TestInputRunS3(t *testing.T) { tfConfig := getTerraformOutputs(t) // Ensure metrics are removed before testing. - monitoring.GetNamespace("dataset").GetRegistry().Remove(inputID) + monitoring.GetNamespace("inputs").GetRegistry().Remove(inputID) uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketName, "testdata/events-array.json", @@ -288,7 +288,7 @@ func TestInputRunS3(t *testing.T) { } snap := mapstr.M(monitoring.CollectStructSnapshot( - monitoring.GetNamespace("dataset").GetRegistry(), + monitoring.GetNamespace("inputs").GetRegistry(), monitoring.Full, false)) t.Log(snap.StringToPrint()) @@ -463,7 +463,7 @@ func TestInputRunSNS(t *testing.T) { drainSQS(t, tfConfig.AWSRegion, tfConfig.QueueURLForSNS) // Ensure metrics are removed before testing. - monitoring.GetNamespace("dataset").GetRegistry().Remove(inputID) + monitoring.GetNamespace("inputs").GetRegistry().Remove(inputID) uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketNameForSNS, "testdata/events-array.json", @@ -503,7 +503,7 @@ func TestInputRunSNS(t *testing.T) { } snap := mapstr.M(monitoring.CollectStructSnapshot( - monitoring.GetNamespace("dataset").GetRegistry(), + monitoring.GetNamespace("inputs").GetRegistry(), monitoring.Full, false)) t.Log(snap.StringToPrint())