From 378721403f4edefbba7ff9258ce95657d1eab8c4 Mon Sep 17 00:00:00 2001 From: Silvia Mitter Date: Tue, 15 Dec 2020 11:03:25 +0100 Subject: [PATCH] Ensure ECS compliant logging when enabled. (#3829) If `logging.ecs` is set log data in ECS compliant way. closes #3796 --- _meta/beat.yml | 4 +- apm-server.docker.yml | 4 +- apm-server.yml | 4 +- beater/middleware/log_middleware.go | 111 +++++++++--------- beater/middleware/log_middleware_test.go | 84 ++++++------- changelogs/head.asciidoc | 1 + cmd/root.go | 1 + .../docs/loggingconfig.asciidoc | 3 +- systemtest/export_test.go | 2 + systemtest/logging_test.go | 8 +- tests/system/apmserver.py | 4 +- tests/system/config/apm-server.yml.j2 | 31 +---- tests/system/test_integration_acm.py | 51 ++++---- tests/system/test_jaeger.py | 1 + 14 files changed, 144 insertions(+), 165 deletions(-) diff --git a/_meta/beat.yml b/_meta/beat.yml index fbf56489820..d1b8a337f6a 100644 --- a/_meta/beat.yml +++ b/_meta/beat.yml @@ -1063,8 +1063,8 @@ output.elasticsearch: # Set to true, to log messages with minimal required Elastic Common Schema (ECS) # information. Recommended to use in combination with `logging.json=true` -# Defaults to false. -#logging.ecs: false +# Defaults to true. +#logging.ecs: true #=============================== HTTP Endpoint =============================== diff --git a/apm-server.docker.yml b/apm-server.docker.yml index 91548b1870a..4da27ac1b09 100644 --- a/apm-server.docker.yml +++ b/apm-server.docker.yml @@ -1063,8 +1063,8 @@ output.elasticsearch: # Set to true, to log messages with minimal required Elastic Common Schema (ECS) # information. Recommended to use in combination with `logging.json=true` -# Defaults to false. -#logging.ecs: false +# Defaults to true. +#logging.ecs: true #=============================== HTTP Endpoint =============================== diff --git a/apm-server.yml b/apm-server.yml index 1b8ddd03050..2b099f5fa08 100644 --- a/apm-server.yml +++ b/apm-server.yml @@ -1063,8 +1063,8 @@ output.elasticsearch: # Set to true, to log messages with minimal required Elastic Common Schema (ECS) # information. Recommended to use in combination with `logging.json=true` -# Defaults to false. -#logging.ecs: false +# Defaults to true. +#logging.ecs: true #=============================== HTTP Endpoint =============================== diff --git a/beater/middleware/log_middleware.go b/beater/middleware/log_middleware.go index 7394de1f4bb..9d465a74289 100644 --- a/beater/middleware/log_middleware.go +++ b/beater/middleware/log_middleware.go @@ -34,73 +34,76 @@ import ( // LogMiddleware returns a middleware taking care of logging processing a request in the middleware and the request handler func LogMiddleware() Middleware { - logger := logp.NewLogger(logs.Request) return func(h request.Handler) (request.Handler, error) { - return func(c *request.Context) { - var reqID, transactionID, traceID string start := time.Now() - tx := apm.TransactionFromContext(c.Request.Context()) - if tx != nil { - // This request is being traced, grab its IDs to add to logs. - traceContext := tx.TraceContext() - transactionID = traceContext.Span.String() - traceID = traceContext.Trace.String() - reqID = transactionID - } else { - uuid, err := uuid.NewV4() - if err != nil { - id := request.IDResponseErrorsInternal - logger.Errorw(request.MapResultIDToStatus[id].Keyword, "error", err) - c.Result.SetWithError(id, err) - c.Write() - return - } - reqID = uuid.String() - } - - reqLogger := logger.With( - "request_id", reqID, - "method", c.Request.Method, - "URL", c.Request.URL, - "content_length", c.Request.ContentLength, - "remote_address", utility.RemoteAddr(c.Request), - "user-agent", c.Request.Header.Get(headers.UserAgent)) - - if traceID != "" { - reqLogger = reqLogger.With( - "trace.id", traceID, - "transaction.id", transactionID, - ) + c.Logger = loggerWithRequestContext(c) + var err error + if c.Logger, err = loggerWithTraceContext(c); err != nil { + id := request.IDResponseErrorsInternal + c.Logger.Error(request.MapResultIDToStatus[id].Keyword, logp.Error(err)) + c.Result.SetWithError(id, err) + c.Write() + return } - - c.Logger = reqLogger h(c) - reqLogger = reqLogger.With("event.duration", time.Since(start)) - + c.Logger = c.Logger.With("event.duration", time.Since(start)) if c.MultipleWriteAttempts() { - reqLogger.Warn("multiple write attempts") + c.Logger.Warn("multiple write attempts") } - keyword := c.Result.Keyword if keyword == "" { keyword = "handled request" } - - keysAndValues := []interface{}{"response_code", c.Result.StatusCode} - if c.Result.Err != nil { - keysAndValues = append(keysAndValues, "error", c.Result.Err.Error()) - } - if c.Result.Stacktrace != "" { - keysAndValues = append(keysAndValues, "stacktrace", c.Result.Stacktrace) - } - + c.Logger = loggerWithResult(c) if c.Result.Failure() { - reqLogger.Errorw(keyword, keysAndValues...) - } else { - reqLogger.Infow(keyword, keysAndValues...) + c.Logger.Error(keyword) + return } - + c.Logger.Info(keyword) }, nil } } + +func loggerWithRequestContext(c *request.Context) *logp.Logger { + logger := logp.NewLogger(logs.Request).With( + "url.original", c.Request.URL.String(), + "http.request.method", c.Request.Method, + "user_agent.original", c.Request.Header.Get(headers.UserAgent), + "source.address", utility.RemoteAddr(c.Request)) + if c.Request.ContentLength != -1 { + logger = logger.With("http.request.body.bytes", c.Request.ContentLength) + } + return logger +} + +func loggerWithTraceContext(c *request.Context) (*logp.Logger, error) { + tx := apm.TransactionFromContext(c.Request.Context()) + if tx == nil { + uuid, err := uuid.NewV4() + if err != nil { + return c.Logger, err + } + return c.Logger.With("http.request.id", uuid.String()), nil + } + // This request is being traced, grab its IDs to add to logs. + traceContext := tx.TraceContext() + transactionID := traceContext.Span.String() + return c.Logger.With( + "trace.id", traceContext.Trace.String(), + "transaction.id", transactionID, + "http.request.id", transactionID, + ), nil +} + +func loggerWithResult(c *request.Context) *logp.Logger { + logger := c.Logger.With( + "http.response.status_code", c.Result.StatusCode) + if c.Result.Err != nil { + logger = logger.With("error.message", c.Result.Err.Error()) + } + if c.Result.Stacktrace != "" { + logger = logger.With("error.stack_trace", c.Result.Stacktrace) + } + return logger +} diff --git a/beater/middleware/log_middleware_test.go b/beater/middleware/log_middleware_test.go index 64133ad8142..ffbaec40db3 100644 --- a/beater/middleware/log_middleware_test.go +++ b/beater/middleware/log_middleware_test.go @@ -21,7 +21,6 @@ import ( "net/http" "testing" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" @@ -29,7 +28,9 @@ import ( "go.elastic.co/apm" "go.elastic.co/apm/apmtest" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/logp/configure" "github.com/elastic/apm-server/beater/beatertest" "github.com/elastic/apm-server/beater/headers" @@ -38,17 +39,14 @@ import ( ) func TestLogMiddleware(t *testing.T) { - err := logp.DevelopmentSetup(logp.ToObserverOutput()) - require.NoError(t, err) testCases := []struct { name, message string level zapcore.Level handler request.Handler code int - error error - stacktrace bool traced bool + ecsKeys []string }{ { name: "Accepted", @@ -56,6 +54,7 @@ func TestLogMiddleware(t *testing.T) { level: zapcore.InfoLevel, handler: beatertest.Handler202, code: http.StatusAccepted, + ecsKeys: []string{"url.original"}, }, { name: "Traced", @@ -63,6 +62,7 @@ func TestLogMiddleware(t *testing.T) { level: zapcore.InfoLevel, handler: beatertest.Handler202, code: http.StatusAccepted, + ecsKeys: []string{"url.original", "trace.id", "transaction.id"}, traced: true, }, { @@ -71,16 +71,15 @@ func TestLogMiddleware(t *testing.T) { level: zapcore.ErrorLevel, handler: beatertest.Handler403, code: http.StatusForbidden, - error: errors.New("forbidden request"), + ecsKeys: []string{"url.original", "error.message"}, }, { - name: "Panic", - message: "internal error", - level: zapcore.ErrorLevel, - handler: Apply(RecoverPanicMiddleware(), beatertest.HandlerPanic), - code: http.StatusInternalServerError, - error: errors.New("panic on Handle"), - stacktrace: true, + name: "Panic", + message: "internal error", + level: zapcore.ErrorLevel, + handler: Apply(RecoverPanicMiddleware(), beatertest.HandlerPanic), + code: http.StatusInternalServerError, + ecsKeys: []string{"url.original", "error.message", "error.stack_trace"}, }, { name: "Error without keyword", @@ -90,12 +89,19 @@ func TestLogMiddleware(t *testing.T) { c.Result.StatusCode = http.StatusForbidden c.Write() }, - code: http.StatusForbidden, + code: http.StatusForbidden, + ecsKeys: []string{"url.original"}, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + // log setup + configure.Logging("APM Server test", + common.MustNewConfigFrom(`{"ecs":true}`)) + require.NoError(t, logp.DevelopmentSetup(logp.ToObserverOutput())) + + // prepare and record request c, rec := beatertest.DefaultContextWithResponseRecorder() c.Request.Header.Set(headers.UserAgent, tc.name) if tc.traced { @@ -105,39 +111,27 @@ func TestLogMiddleware(t *testing.T) { } Apply(LogMiddleware(), tc.handler)(c) + // check log lines assert.Equal(t, tc.code, rec.Code) - for i, entry := range logp.ObserverLogs().TakeAll() { - // expect only one log entry per request - assert.Equal(t, i, 0) - assert.Equal(t, logs.Request, entry.LoggerName) - assert.Equal(t, tc.level, entry.Level) - assert.Equal(t, tc.message, entry.Message) + entries := logp.ObserverLogs().TakeAll() + require.Equal(t, 1, len(entries)) + entry := entries[0] + assert.Equal(t, logs.Request, entry.LoggerName) + assert.Equal(t, tc.level, entry.Level) + assert.Equal(t, tc.message, entry.Message) - ec := entry.ContextMap() - t.Logf("context map: %v", ec) - - assert.NotEmpty(t, ec["request_id"]) - assert.NotEmpty(t, ec["method"]) - assert.Equal(t, c.Request.URL.String(), ec["URL"]) - assert.NotEmpty(t, ec["remote_address"]) - assert.Contains(t, ec, "event.duration") - assert.Equal(t, c.Request.Header.Get(headers.UserAgent), ec["user-agent"]) - // zap encoded type - assert.Equal(t, tc.code, int(ec["response_code"].(int64))) - if tc.error != nil { - assert.Equal(t, tc.error.Error(), ec["error"]) - } - if tc.stacktrace { - assert.NotZero(t, ec["stacktrace"]) - } - if tc.traced { - assert.NotEmpty(t, ec, "trace.id") - assert.NotEmpty(t, ec, "transaction.id") - assert.Equal(t, ec["request_id"], ec["transaction.id"]) - } else { - assert.NotContains(t, ec, "trace.id") - assert.NotContains(t, ec, "transaction.id") - } + encoder := zapcore.NewMapObjectEncoder() + ec := common.MapStr{} + for _, f := range entry.Context { + f.AddTo(encoder) + ec.DeepUpdate(encoder.Fields) + } + keys := []string{"http.request.id", "http.request.method", "http.request.body.bytes", + "source.address", "user_agent.original", "http.response.status_code", "event.duration"} + keys = append(keys, tc.ecsKeys...) + for _, key := range keys { + ok, _ := ec.HasKey(key) + assert.True(t, ok, key) } }) } diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index ad262eae377..4426a95395d 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -28,3 +28,4 @@ https://github.com/elastic/apm-server/compare/7.10\...master[View commits] * Label/custom/mark keys are now sanitized (rather than validated and rejected) by the server {pull}4465[4465] * Upgrade Go to 1.14.12 {pull}4478[4478] * Added apm-server.response_headers config {pull}4523[4523] +* Switch logging format to be ECS compliant where possible {pull}3829[3829] \ No newline at end of file diff --git a/cmd/root.go b/cmd/root.go index 57aa3502a6a..8d9173b5f15 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -48,6 +48,7 @@ var libbeatConfigOverrides = []cfgfile.ConditionalOverride{{ "metrics": map[string]interface{}{ "enabled": false, }, + "ecs": true, }, }), }} diff --git a/docs/copied-from-beats/docs/loggingconfig.asciidoc b/docs/copied-from-beats/docs/loggingconfig.asciidoc index 767bd7c2f08..2d2e9eaa065 100644 --- a/docs/copied-from-beats/docs/loggingconfig.asciidoc +++ b/docs/copied-from-beats/docs/loggingconfig.asciidoc @@ -249,8 +249,7 @@ When true, logs messages in JSON format. The default is false. [float] ==== `logging.ecs` -When true, logs messages with minimal required Elastic Common Schema (ECS) -information. +When true, logs messages in Elastic Common Schema (ECS) compliant format. ifndef::serverless[] [float] diff --git a/systemtest/export_test.go b/systemtest/export_test.go index b11940d7245..51849acfbc0 100644 --- a/systemtest/export_test.go +++ b/systemtest/export_test.go @@ -49,6 +49,7 @@ func TestExportConfigDefaults(t *testing.T) { expectedConfig := strings.ReplaceAll(` logging: + ecs: true metrics: enabled: false path: @@ -69,6 +70,7 @@ func TestExportConfigOverrideDefaults(t *testing.T) { expectedConfig := strings.ReplaceAll(` logging: + ecs: true metrics: enabled: true path: diff --git a/systemtest/logging_test.go b/systemtest/logging_test.go index 355dd1e9de5..f3a0f8d5cdf 100644 --- a/systemtest/logging_test.go +++ b/systemtest/logging_test.go @@ -69,8 +69,8 @@ func TestAPMServerRequestLoggingValid(t *testing.T) { srv.Close() for _, entry := range srv.Logs.All() { - if entry.Logger == "request" && entry.Fields["URL"] == "/intake/v2/events" { - statusCode, _ := entry.Fields["response_code"].(float64) + if entry.Logger == "request" && entry.Fields["url.original"] == "/intake/v2/events" { + statusCode, _ := entry.Fields["http.response.status_code"].(float64) logEntries = append(logEntries, entry) requestEntries = append(requestEntries, requestEntry{ level: entry.Level, @@ -95,8 +95,8 @@ func TestAPMServerRequestLoggingValid(t *testing.T) { }}, requestEntries) assert.NotContains(t, logEntries[0].Fields, "error") - assert.Regexp(t, "validation error: 'transaction' required", logEntries[1].Fields["error"]) - assert.Equal(t, "event exceeded the permitted size.", logEntries[2].Fields["error"]) + assert.Regexp(t, "validation error: 'transaction' required", logEntries[1].Fields["error.message"]) + assert.Equal(t, "event exceeded the permitted size.", logEntries[2].Fields["error.message"]) } // validMetadataJSON returns valid JSON-encoded metadata, diff --git a/tests/system/apmserver.py b/tests/system/apmserver.py index bf091794fa8..dd72614b890 100644 --- a/tests/system/apmserver.py +++ b/tests/system/apmserver.py @@ -337,8 +337,8 @@ def check_for_no_smap(self, doc): def logged_requests(self, url="/intake/v2/events"): for line in self.get_log_lines(): jline = json.loads(line) - u = urlparse(jline.get("URL", "")) - if jline.get("logger") == "request" and u.path == url: + u = urlparse(jline.get("url.original", "")) + if jline.get("log.logger") == "request" and u.path == url: yield jline def approve_docs(self, base_path, received): diff --git a/tests/system/config/apm-server.yml.j2 b/tests/system/config/apm-server.yml.j2 index 1723202193e..8b075e8e97f 100644 --- a/tests/system/config/apm-server.yml.j2 +++ b/tests/system/config/apm-server.yml.j2 @@ -249,22 +249,9 @@ output.elasticsearch: ############################# Logging ######################################### -{% if logging_json or logging_level %} +{% if logging_json or logging_level or logging_ecs_disabled %} logging: -{% else %} -#logging: {% endif %} - # Send all logging output to syslog. On Windows default is false, otherwise - # default is true. - #to_syslog: true - - # Write all logging output to files. Beats automatically rotate files if configurable - # limit is reached. - #to_files: false - - # Enable debug output for selected components. - #selectors: [] - {% if logging_json %} # Set to true to log messages in json format. json: {{ logging_json }} @@ -275,19 +262,9 @@ logging: level: {{ logging_level }} {% endif %} - #files: - # The directory where the log files will written to. - #path: /var/log/apm-server - - # The name of the files where the logs are written to. - #name: apm-server - - # Configure log file size limit. If limit is reached, log file will be - # automatically rotated - #rotateeverybytes: 10485760 # = 10MB - - # Number of rotated log files to keep. Oldest files will be deleted first. - #keepfiles: 7 +{% if logging_ecs_disabled %} + ecs: false +{% endif %} queue.mem.flush.min_events: {{ queue_flush }} diff --git a/tests/system/test_integration_acm.py b/tests/system/test_integration_acm.py index e840b6805dd..03c619f13a6 100644 --- a/tests/system/test_integration_acm.py +++ b/tests/system/test_integration_acm.py @@ -42,10 +42,10 @@ def test_config_requests(self): ) assert r1.status_code == 400, r1.status_code expect_log.append({ - "level": "error", + "log.level": "error", "message": "invalid query", - "error": "service.name is required", - "response_code": 400, + "error.message": "service.name is required", + "http.response.status_code": 400, }) # no configuration for service @@ -55,9 +55,9 @@ def test_config_requests(self): ) assert r2.status_code == 200, r2.status_code expect_log.append({ - "level": "info", + "log.level": "info", "message": "request ok", - "response_code": 200, + "http.response.status_code": 200, }) self.assertEqual({}, r2.json()) @@ -70,9 +70,9 @@ def test_config_requests(self): assert r3.status_code == 200, r3.status_code # TODO (gr): validate Cache-Control header - https://github.com/elastic/apm-server/issues/2438 expect_log.append({ - "level": "info", + "log.level": "info", "message": "request ok", - "response_code": 200, + "http.response.status_code": 200, }) self.assertEqual({"transaction_sample_rate": "0.05"}, r3.json()) @@ -85,9 +85,9 @@ def test_config_requests(self): }) assert r3_again.status_code == 304, r3_again.status_code expect_log.append({ - "level": "info", + "log.level": "info", "message": "not modified", - "response_code": 304, + "http.response.status_code": 304, }) self.create_service_config( @@ -103,9 +103,9 @@ def test_config_requests(self): assert r4.status_code == 200, r4.status_code self.assertEqual({"transaction_sample_rate": "0.15"}, r4.json()) expect_log.append({ - "level": "info", + "log.level": "info", "message": "request ok", - "response_code": 200, + "http.response.status_code": 200, }) # not modified on re-request @@ -120,9 +120,9 @@ def test_config_requests(self): }) assert r4_again.status_code == 304, r4_again.status_code expect_log.append({ - "level": "info", + "log.level": "info", "message": "not modified", - "response_code": 304, + "http.response.status_code": 304, }) self.update_service_config( @@ -144,9 +144,9 @@ def test_config_requests(self): assert r4_post_update.status_code == 200, r4_post_update.status_code self.assertEqual({"transaction_sample_rate": "0.99"}, r4_post_update.json()) expect_log.append({ - "level": "info", + "log.level": "info", "message": "request ok", - "response_code": 200, + "http.response.status_code": 200, }) # configuration for service+environment (all includes non existing) @@ -158,14 +158,15 @@ def test_config_requests(self): headers={"Content-Type": "application/json"}) assert r5.status_code == 200, r5.status_code expect_log.append({ - "level": "info", + "log.level": "info", "message": "request ok", - "response_code": 200, + "http.response.status_code": 200, }) self.assertEqual({"transaction_sample_rate": "0.05"}, r5.json()) config_request_logs = list(self.logged_requests(url="/config/v1/agents")) - assert len(config_request_logs) == len(expect_log) + assert len(config_request_logs) == len( + expect_log), "expected\n{}\nreceived\n{}".format(expect_log, config_request_logs) for want, got in zip(expect_log, config_request_logs): assert set(want).issubset(got) @@ -207,15 +208,15 @@ def test_config_requests(self): config_request_logs = list(self.logged_requests(url="/config/v1/agents")) assert len(config_request_logs) == 2, config_request_logs assert set({ - "level": "error", + "log.level": "error", "message": "unauthorized", - "error": "unauthorized", - "response_code": 401, + "error.message": "unauthorized", + "http.response.status_code": 401, }).issubset(config_request_logs[0]) assert set({ - "level": "error", + "log.level": "error", "message": "unable to retrieve connection to Kibana", - "response_code": 503, + "http.response.status_code": 503, }).issubset(config_request_logs[1]) @@ -234,9 +235,9 @@ def test_log_kill_switch_active(self): assert r.status_code == 403, r.status_code config_request_logs = list(self.logged_requests(url="/config/v1/agents")) assert set({ - "level": "error", + "log.level": "error", "message": "forbidden request", - "response_code": 403, + "http.response.status_code": 403, }).issubset(config_request_logs[0]) diff --git a/tests/system/test_jaeger.py b/tests/system/test_jaeger.py index 20e5073970d..12f4b673953 100644 --- a/tests/system/test_jaeger.py +++ b/tests/system/test_jaeger.py @@ -33,6 +33,7 @@ def config(self): # check that the authorization tag is always removed, # even if there's no secret token / API Key auth. "jaeger_grpc_auth_tag": "authorization", + "logging_ecs_disabled": "true", }) return cfg