diff --git a/docs/reference/mapping/params/format.asciidoc b/docs/reference/mapping/params/format.asciidoc index dff7bb4a11ee4..2be120056aed6 100644 --- a/docs/reference/mapping/params/format.asciidoc +++ b/docs/reference/mapping/params/format.asciidoc @@ -52,7 +52,7 @@ not accidentally map irrelevant strings as dates. The following tables lists all the defaults ISO formats supported: -`epoch_millis`:: +[[epoch-millis]]`epoch_millis`:: A formatter for the number of milliseconds since the epoch. Note, that this timestamp is subject to the limits of a Java `Long.MIN_VALUE` and diff --git a/docs/reference/rest-api/index.asciidoc b/docs/reference/rest-api/index.asciidoc index 04f014b75054b..7393b6b9a8a7e 100644 --- a/docs/reference/rest-api/index.asciidoc +++ b/docs/reference/rest-api/index.asciidoc @@ -30,6 +30,7 @@ not be included yet. * <> * <> * <> +* <> * <> * <> * <> @@ -72,6 +73,7 @@ include::{es-repo-dir}/ilm/apis/ilm-api.asciidoc[] include::{es-repo-dir}/ingest/apis/index.asciidoc[] include::info.asciidoc[] include::{es-repo-dir}/licensing/index.asciidoc[] +include::logs.asciidoc[] include::{xes-repo-dir}/rest-api/logstash/index.asciidoc[] include::{es-repo-dir}/ml/common/apis/index.asciidoc[] include::{es-repo-dir}/ml/anomaly-detection/apis/index.asciidoc[] diff --git a/docs/reference/rest-api/logs.asciidoc b/docs/reference/rest-api/logs.asciidoc new file mode 100644 index 0000000000000..9546ac4029f53 --- /dev/null +++ b/docs/reference/rest-api/logs.asciidoc @@ -0,0 +1,278 @@ +[role="xpack"] +[[logs-api]] +== Log ingestion API + +experimental::[] + +Provides a simple JSON API to ingest log events into {es}. + +[discrete] +[[logs-api-request]] +=== {api-request-title} + +`POST /_logs` + +`POST /_logs/` + +`POST /_logs//` + +[discrete] +[[logs-api-prereqs]] +=== {api-prereq-title} +* If the {es} {security-features} are enabled, you must have the `create` +<> for the target data stream. +* As the target stream depends on routing rules and the data provided in the log events, +it's recommended to grant permissions for `logs-*-*`. +* Automatic data stream creation requires a matching index template with data +stream enabled. See <>. + +[discrete] +[[logs-api-desc]] +=== {api-description-title} + +Provides a way to ingest log events into {es}, similar to the <>. + +The log events are specified in the request body using a newline delimited JSON (NDJSON) structure. + +The events are indexed into the `logs--` <>, +according to the dataset and namespace parameters, which can be provided globally or on a per-event basis. + +The endpoint is designed in a way that logs are never dropped, as long as the cluster has enough capacity. + +If an error happens during ingestion, +the logs are sent to the `logs-dlq-` data stream that acts as a dead letter queue for failed events. +However, log ingestion should rarely fail as the mappings in the built-in index template for the `logs-*-*` data streams are designed to minimize mapping conflicts. + +A <> from the {ecs-ref}[Elastic Common Schema (ECS)] are indexed by default that are commonly used to search or filter logs. +All other fields are not indexed by default. +But you can still add any top-level fields and use them in searches and aggregations as the default index template for logs +<>. + +For custom fields that are frequently used in searches or aggregations, you might want to leverage the speed benefits of indexing the field. +This comes at the expense of a larger index size and more processing at ingest time. +To index a custom field, <> for your dataset `logs--*` based on the built-in index template for `logs-*-*` and add your custom field to the mappings. +To immediately apply the setting, <> +This affects any new data added to the stream after the rollover. +However, it does not affect the data stream’s existing backing indices or existing data. + +All fields, aside from the `@timestamp` field, are configured to <> values. +This means that if a log event contains a field whose type is incompatible with the type of the field that exists in the mapping, +{es} will ignore the field instead of rejecting the whole document. +For example, when a string is provided for a field that is mapped to integer. +Note that this currently doesn't apply for object/scalar mismatches, such as `"foo": "bar"` vs `"foo.bar": "baz"`. + +[discrete] +[[logs-api-path-params]] +=== {api-path-parms-title} + +{ecs-ref}/ecs-data_stream.html#field-data-stream-dataset[`data_stream.dataset`]:: + (Optional, string) + Defaults to `generic`. + Describes the ingested data and its structure. + It is highly recommended to provide a value for this so that you can add structure to your logs after the fact. + Example: `nginx.access`. + +{ecs-ref}/ecs-data_stream.html#field-data-stream-namespace[`data_stream.namespace`]:: + (Optional, string) + Defaults to `default`. + A user-configurable arbitrary grouping, such as an environment (dev, prod, or qa), a team, or a strategic business unit. + +[discrete] +[[logs-api-query-params]] +=== {api-query-parms-title} + +Any provided query parameter will be added to each log line. +For example, `/_logs?service.name=myapp` will add `"service.name": "myapp"` to all logs. + +[discrete] +[[logs-api-request-body]] +=== {api-request-body-title} +The request body contains a newline-delimited list of log events to ingest. +The individual events don't have any required fields and can contain arbitrary JSON content. +There is no required structure for the log events, and you can add any top-level fields. +However, it is recommended to follow the {ecs-ref}[Elastic Common Schema (ECS)] to structure your logs. + +TIP: Use the {ecs-logging-ref}/intro.html[ECS logging libraries] to create ECS-compliant JSON logs. + +Only the following fields, which are commonly used for searching, filtering, and correlating logs, are indexed by default: + +* {ecs-ref}/ecs-base.html#field-timestamp[`@timestamp`] + + If not provided, will be set to the current time. + Can be provided as <> or <> by default. +* {ecs-ref}/ecs-data_stream.html#field-data-stream-dataset[`data_stream.dataset`] + + Overrides the `data_stream.dataset` <> on a per-event basis. +* {ecs-ref}/ecs-data_stream.html#field-data-stream-namespace[`data_stream.namespace`] + + Overrides the `data_stream.namespace` <> on a per-event basis. +* {ecs-ref}/ecs-base.html#field-message[`message`] +* {ecs-ref}/ecs-log.html#field-log-level[`log.level`] +* {ecs-ref}/ecs-log.html#field-log-logger[`log.logger`] +* {ecs-ref}/ecs-service.html#field-service-name[`service.name`] +* {ecs-ref}/ecs-service.html#field-service-environment[`service.environment`] +* {ecs-ref}/ecs-service.html#field-service-version[`service.version`] +* {ecs-ref}/ecs-tracing.html#field-trace-id[`trace.id`] +* {ecs-ref}/ecs-tracing.html#field-transaction-id[`transaction.id`] +* {ecs-ref}/ecs-tracing.html#field-span-id[`span.id`] +* {ecs-ref}/ecs-process.html#field-process-pid[`process.pid`] +* {ecs-ref}/ecs-process.html#field-process-thread-name[`process.thread.name`] +* {ecs-ref}/ecs-error.html#field-error-type[`error.type`] +* {ecs-ref}/ecs-error.html#field-error-message[`error.message`] +* {ecs-ref}/ecs-event.html#field-event-dataset[`event.dataset`] +* {ecs-ref}/ecs-cloud.html#field-cloud-provider[`cloud.provider`] +* {ecs-ref}/ecs-cloud.html#field-cloud-availability-zone[`cloud.availability_zone`] +* {ecs-ref}/ecs-cloud.html#field-cloud-region[`cloud.region`] +* {ecs-ref}/ecs-host.html#field-host-hostname[`host.hostname`] +* {ecs-ref}/ecs-host.html#field-host-name[`host.name`] +* {ecs-ref}/ecs-container.html#field-container-id[`container.id`] +* {ecs-ref}/ecs-container.html#field-container-name[`container.name`] +* {ecs-ref}/ecs-orchestrator.html#field-orchestrator-namespace[`orchestrator.namespace`] +* {ecs-ref}/ecs-orchestrator.html#field-orchestrator-cluster-id[`orchestrator.cluster.id`] +* {ecs-ref}/ecs-orchestrator.html#field-orchestrator-cluster-name[`orchestrator.cluster.name`] +* {ecs-ref}/ecs-orchestrator.html#field-orchestrator-resource-id[`orchestrator.resource.id`] +* {ecs-ref}/ecs-orchestrator.html#field-orchestrator-resource-name[`orchestrator.resource.name`] + +Dotted field names are expanded to objects so that they can be used interchangeably with nested objects. For example, the following documents are treated equally: `{"log.level": "INFO"}`, `{"log": { "level": "INFO"} }`. + +`_metadata`:: +(Optional, object) +Marks this line as a metadata line. +Provides metadata that will be merged into subsequent events. +If a metadata event is provided as the first line, the metadata is added to all logs events. +If a metadata event is provided after the first line, the metadata is added to all subsequent log events until another metadata event is provided. +This way you can easily add global metadata and send logs from multiple datasets in a single request, providing dataset-specific metadata. + +[discrete] +[[logs-api-response-body]] +==== {api-response-body-title} + +The log API's response body is always empty. + +Status + +* 202 Accepted: The log events have been received and are processed in the background. They should be searchable after a short while. +* 500 Internal Server Error: There was an error while processing the log events. Some logs may have been lost. + +[discrete] +[[logs-api-example]] +=== {api-examples-title} + +Ingests a single log into the `logs-myapp-default` data stream. + +[source,console] +------------------------------------------------------------ +POST _logs/myapp?service.name=myapp <1> +{"@timestamp": 1463990734853, "message": "Hello", "custom": "value"} <2> +------------------------------------------------------------ + +<1> Provides global metadata that applies to all log events in the request via query parameters. +<2> Specifies the timestamp in milliseconds since epoch. + +After a short while the logs will become searchable. +Event though `custom_field` is not among the <>, +you can use it in searches and aggregations as it is mapped as a <>. + +//// +[source,console] +---- +POST logs-myapp-default/_refresh +---- +// TEST[continued] +// commented out to avoid documenting that a _refresh will always be sufficient to make the logs searchable +// in the future, logs may be buffered and asynchronously processed +//// + +[source,console] +------------------------------------------------------------ +POST logs-myapp-default/_search?q=custom:value +------------------------------------------------------------ +// TEST[continued] + +The API returns the following response: + +[source,console-result] +---- +{ + "took": 5, + "timed_out": false, + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0 + }, + "hits": { + "total": { + "value": 1, + "relation": "eq" + }, + "max_score": 1.0, + "hits": [ + { + "_index": ".ds-logs-foo-default-2016.05.23-000001", + "_id": "FKgQT4IBWsM7OYMsIp0N", + "_score": 1.0, + "_source": { + "@timestamp": 1463990734853, + "message": "Hello", + "custom": "value", + "service": { + "name": "myapp" + }, + "data_stream": { + "type": "logs", + "dataset": "myapp", + "namespace": "default" + } + } + } + ] + } +} +---- +// TESTRESPONSE[s/"took": 5/"took": $body.took/] +// TESTRESPONSE[s/"_index": ".*"/"_index": $body.hits.hits.0._index/] +// TESTRESPONSE[s/"_id": ".*"/"_id": $body.hits.hits.0._id/] +// TESTRESPONSE[s/"_source": \{\n/"_source": \{\n"error_trace": "true",\n/] +// The test system adds an error_trace:true parameter to all requests, +// including the logs API which interprets it as global metadata that's added to every event + +''' + +Ingests a single log into the `logs-myapp-default` data stream. + +[source,console] +------------------------------------------------------------ +POST _logs/myapp +{"_metadata": {"service.name": "myapp"}} <1> +{"@timestamp": "2016-05-23T08:05:34.853Z", "message": "Hello World"} <2> +------------------------------------------------------------ + +<1> Provides global metadata that applies to all log events in the request via a global metadata event. +<2> Specifies the timestamp as an ISO date string. + +''' + +Ingests two log events into the `logs-myapp-default` and `logs-my_other_app-default` data stream, respectively. +Provides metadata via local metadata events. + +[source,console] +------------------------------------------------------------ +POST _logs +{"_metadata": {"service.name": "myapp"}} <1> +{"_metadata": {"data_stream.dataset": "myapp"}} <2> +{"@timestamp": "2016-05-23T08:05:34.853Z", "message": "hello app"} <3> +{"_metadata": {"data_stream.dataset": "my_other_app"}} <4> +{"@timestamp": "2016-05-23T08:05:34.853Z", "message": "other app"} <5> +------------------------------------------------------------ + +<1> Provides global metadata that applies to all log events in the request via a global metadata event. +<2> Provides local metadata that gets merged into all subsequent log lines until the next local metadata object is provided. +In this case, the metadata applies to the next event. +<3> This log event will have the following metadata: +`"service.name": "myapp"` (from the global metadata object) and +`"data_stream.dataset": "myapp"` (from the first local metadata object) +<4> Provides local metadata that invalidates the previous local metadata. +It gets merged into all subsequent log lines until the next local metadata object is provided. +In this case, the metadata applies to the last event. +<5> This log event will have the following metadata: +`"service.name": "myapp"` (from the global metadata object) and +`"data_stream.dataset": "my_other_app"` (from the second local metadata object) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JsonProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JsonProcessor.java index a79954de0f35c..59f9cf65da2dd 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JsonProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JsonProcessor.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.util.MapUtils; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; @@ -108,7 +109,7 @@ public static void apply(Map ctx, String fieldName, boolean allo @SuppressWarnings("unchecked") Map map = (Map) value; if (conflictStrategy == ConflictStrategy.MERGE) { - recursiveMerge(ctx, map); + MapUtils.recursiveMerge(ctx, map); } else { ctx.putAll(map); } @@ -117,26 +118,6 @@ public static void apply(Map ctx, String fieldName, boolean allo } } - public static void recursiveMerge(Map target, Map from) { - for (String key : from.keySet()) { - if (target.containsKey(key)) { - Object targetValue = target.get(key); - Object fromValue = from.get(key); - if (targetValue instanceof Map && fromValue instanceof Map) { - @SuppressWarnings("unchecked") - Map targetMap = (Map) targetValue; - @SuppressWarnings("unchecked") - Map fromMap = (Map) fromValue; - recursiveMerge(targetMap, fromMap); - } else { - target.put(key, fromValue); - } - } else { - target.put(key, from.get(key)); - } - } - } - @Override public IngestDocument execute(IngestDocument document) throws Exception { if (addToRoot) { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/logs.json b/rest-api-spec/src/main/resources/rest-api-spec/api/logs.json new file mode 100644 index 0000000000000..49ce4637efd71 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/logs.json @@ -0,0 +1,57 @@ +{ + "logs":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/logs.html", + "description":"Provides a simple JSON API to ingest log events into Elasticsearch." + }, + "stability":"experimental", + "visibility":"public", + "headers":{ + "accept": [ "application/json"], + "content_type": ["application/x-ndjson"] + }, + "url":{ + "paths":[ + { + "path":"/_logs", + "methods":[ + "POST" + ] + }, + { + "path":"/_logs/{dataset}", + "methods":[ + "POST" + ], + "parts":{ + "dataset":{ + "type":"string", + "description":"Default index for items which don't provide one" + } + } + }, + { + "path":"/_logs/{dataset}/{namespace}", + "methods":[ + "POST" + ], + "parts":{ + "dataset":{ + "type":"string", + "description":"Describes the ingested data and its structure." + }, + "namespace":{ + "type":"string", + "description":"A user-configurable arbitrary grouping, such as an environment (dev, prod, or qa), a team, or a strategic business unit." + } + } + } + ] + }, + "body":{ + "description":"The log events, separated by newlines", + "required":true, + "serialize":"bulk" + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 9b6ec86a65a51..cd29608b4f668 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -406,6 +406,7 @@ import org.elasticsearch.rest.action.ingest.RestGetPipelineAction; import org.elasticsearch.rest.action.ingest.RestPutPipelineAction; import org.elasticsearch.rest.action.ingest.RestSimulatePipelineAction; +import org.elasticsearch.rest.action.logs.RestLogsAction; import org.elasticsearch.rest.action.search.RestClearScrollAction; import org.elasticsearch.rest.action.search.RestCountAction; import org.elasticsearch.rest.action.search.RestExplainAction; @@ -882,6 +883,9 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestUpdateDesiredNodesAction()); registerHandler.accept(new RestDeleteDesiredNodesAction()); + // Logs API + registerHandler.accept(new RestLogsAction()); + for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers( settings, diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index c7c629bb9045f..6dd73ca100d42 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -129,7 +129,7 @@ public BulkRequest add(DocWriteRequest request) { /** * Adds a list of requests to be executed. Either index or delete requests. */ - public BulkRequest add(Iterable> requests) { + public BulkRequest add(Iterable> requests) { for (DocWriteRequest request : requests) { add(request); } diff --git a/server/src/main/java/org/elasticsearch/common/util/MapUtils.java b/server/src/main/java/org/elasticsearch/common/util/MapUtils.java new file mode 100644 index 0000000000000..82915a2bdc6da --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/MapUtils.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util; + +import java.util.Map; + +public class MapUtils { + public static void recursiveMerge(Map target, Map from) { + for (String key : from.keySet()) { + if (target.containsKey(key)) { + Object targetValue = target.get(key); + Object fromValue = from.get(key); + if (targetValue instanceof Map && fromValue instanceof Map) { + @SuppressWarnings("unchecked") + Map targetMap = (Map) targetValue; + @SuppressWarnings("unchecked") + Map fromMap = (Map) fromValue; + recursiveMerge(targetMap, fromMap); + } else { + target.put(key, fromValue); + } + } else { + target.put(key, from.get(key)); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/logs/DataStreamNamingUtils.java b/server/src/main/java/org/elasticsearch/rest/action/logs/DataStreamNamingUtils.java new file mode 100644 index 0000000000000..db2bafaf133a1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/logs/DataStreamNamingUtils.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.rest.action.logs; + +import java.util.Locale; + +public class DataStreamNamingUtils { + + public static final String DATA_STREAM = "data_stream"; + public static final String DATA_STREAM_TYPE = DATA_STREAM + ".type"; + public static final String DATA_STREAM_DATASET = DATA_STREAM + ".dataset"; + public static final String DATA_STREAM_NAMESPACE = DATA_STREAM + ".namespace"; + + private static final char[] DISALLOWED_IN_DATASET = new char[] { '\\', '/', '*', '?', '\"', '<', '>', '|', ' ', ',', '#', ':', '-' }; + private static final char[] DISALLOWED_IN_NAMESPACE = new char[] { '\\', '/', '*', '?', '\"', '<', '>', '|', ' ', ',', '#', ':' }; + private static final int MAX_LENGTH = 100; + private static final char REPLACEMENT_CHAR = '_'; + + public static String sanitizeDataStreamDataset(String dataset) { + return sanitizeDataStreamField(dataset, DISALLOWED_IN_DATASET); + } + + public static String sanitizeDataStreamNamespace(String namespace) { + return sanitizeDataStreamField(namespace, DISALLOWED_IN_NAMESPACE); + } + + private static String sanitizeDataStreamField(String s, char[] disallowedInDataset) { + if (s == null) { + return null; + } + s = s.toLowerCase(Locale.ROOT); + s = s.substring(0, Math.min(s.length(), MAX_LENGTH)); + for (char c : disallowedInDataset) { + s = s.replace(c, REPLACEMENT_CHAR); + } + return s; + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/logs/RestLogsAction.java b/server/src/main/java/org/elasticsearch/rest/action/logs/RestLogsAction.java new file mode 100644 index 0000000000000..a82681cb7d6ba --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/logs/RestLogsAction.java @@ -0,0 +1,252 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.rest.action.logs; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.internal.Requests; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.util.MapUtils; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestActionListener; +import org.elasticsearch.xcontent.XContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +import static java.util.function.Predicate.not; +import static org.elasticsearch.rest.RestRequest.Method.POST; +import static org.elasticsearch.rest.action.logs.DataStreamNamingUtils.DATA_STREAM; +import static org.elasticsearch.rest.action.logs.DataStreamNamingUtils.DATA_STREAM_DATASET; +import static org.elasticsearch.rest.action.logs.DataStreamNamingUtils.DATA_STREAM_NAMESPACE; +import static org.elasticsearch.rest.action.logs.DataStreamNamingUtils.DATA_STREAM_TYPE; +import static org.elasticsearch.xcontent.XContentParserConfiguration.EMPTY; + +public class RestLogsAction extends BaseRestHandler { + + private static final Logger logger = LogManager.getLogger(RestLogsAction.class); + + @Override + public String getName() { + return "logs_action"; + } + + @Override + public List routes() { + return List.of( + new Route(POST, "/_logs"), + new Route(POST, "/_logs/{data_stream.dataset}"), + new Route(POST, "/_logs/{data_stream.dataset}/{data_stream.namespace}") + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + Map globalMetadata = new HashMap<>(); + Map localMetadata = new HashMap<>(); + Map params = request.params(); + params.entrySet() + .stream() + .filter(not(e -> e.getKey().startsWith("_"))) + .forEach(e -> addPath(globalMetadata, e.getKey(), request.param(e.getKey()))); + + List indexRequests = new ArrayList<>(); + XContent xContent = request.getXContentType().xContent(); + BytesReference content = request.content(); + byte separator = xContent.streamSeparator(); + for (int offset = 0, line = 0, endOfEvent; offset < content.length() - 1; offset = endOfEvent + 1, line++) { + endOfEvent = content.indexOf(separator, offset); + if (endOfEvent == -1) { + endOfEvent = content.length(); + } + try (XContentParser parser = xContent.createParser(EMPTY, content.array(), offset, endOfEvent - offset)) { + Map event = null; + try { + event = parser.map(); + if (event.size() == 1 && event.containsKey("_metadata")) { + Map metadata = getMetadata(event); + expandDots(metadata); + if (line == 0) { + MapUtils.recursiveMerge(globalMetadata, metadata); + } else { + localMetadata = metadata; + } + continue; + } else { + expandDots(event); + event = mergeMetadata(event, globalMetadata, localMetadata); + applyDefaultValues(event); + } + } catch (Exception e) { + event = mergeMetadata(Map.of(), globalMetadata, localMetadata); + event.put("message", content.slice(offset, endOfEvent - offset).utf8ToString()); + event = createDlqDoc(event, e); + } + indexRequests.add(Requests.indexRequest(routeToDataStream(event)).opType(DocWriteRequest.OpType.CREATE).source(event)); + } + } + + return channel -> { + client.bulk(Requests.bulkRequest().add(indexRequests), new RestActionListener<>(channel) { + @Override + protected void processResponse(BulkResponse bulkItemResponses) throws Exception { + if (bulkItemResponses.hasFailures() == false) { + sendResponse(channel, RestStatus.ACCEPTED, b -> {}); + return; + } + BulkRequest retryBulk = Requests.bulkRequest(); + Arrays.stream(bulkItemResponses.getItems()).filter(BulkItemResponse::isFailed).forEach(failedRequest -> { + IndexRequest originalRequest = indexRequests.get(failedRequest.getItemId()); + Map doc = originalRequest.sourceAsMap(); + Exception cause = failedRequest.getFailure().getCause(); + doc = createDlqDoc(doc, cause); + retryBulk.add(Requests.indexRequest(routeToDataStream(doc)).opType(DocWriteRequest.OpType.CREATE).source(doc)); + }); + client.bulk(retryBulk, new RestActionListener<>(channel) { + @Override + protected void processResponse(BulkResponse bulkItemResponses) throws Exception { + if (bulkItemResponses.hasFailures() == false) { + sendResponse(channel, RestStatus.ACCEPTED, b -> {}); + } else { + sendResponse(channel, RestStatus.INTERNAL_SERVER_ERROR, b -> {}); + logger.error( + "Failed to ingest logs: re-try batch has failures. First failure: {}", + Arrays.stream(bulkItemResponses.getItems()) + .filter(BulkItemResponse::isFailed) + .findFirst() + .map(BulkItemResponse::getFailureMessage) + .orElse(null) + ); + } + } + }); + } + }); + }; + } + + private Map createDlqDoc(Map doc, Exception cause) { + Map dlqDoc = new HashMap<>(); + dlqDoc.put("@timestamp", Instant.now().toString()); + addPath(dlqDoc, "event.original", doc); + if (cause != null) { + addPath(dlqDoc, "error.type", ElasticsearchException.getExceptionName(cause)); + addPath(dlqDoc, "error.message", cause.getMessage()); + } + addPath(dlqDoc, DATA_STREAM_TYPE, "logs"); + addPath(dlqDoc, DATA_STREAM_DATASET, "dlq"); + @SuppressWarnings("unchecked") + Map dataStream = (Map) doc.getOrDefault(DATA_STREAM, Map.of()); + addPath(dlqDoc, DATA_STREAM_NAMESPACE, dataStream.getOrDefault("namespace", "default")); + return dlqDoc; + } + + private Map mergeMetadata( + Map event, + Map globalMetadata, + Map localMetadata + ) { + HashMap doc = new HashMap<>(globalMetadata); + MapUtils.recursiveMerge(doc, localMetadata); + MapUtils.recursiveMerge(doc, event); + return doc; + } + + private void applyDefaultValues(Map doc) { + doc.computeIfAbsent("@timestamp", k -> Instant.now().toString()); + doc.computeIfAbsent("data_stream", k -> new HashMap<>()); + @SuppressWarnings("unchecked") + Map dataStream = (Map) doc.get("data_stream"); + dataStream.put("type", "logs"); + dataStream.computeIfPresent("dataset", (k, v) -> DataStreamNamingUtils.sanitizeDataStreamDataset(v)); + dataStream.computeIfPresent("namespace", (k, v) -> DataStreamNamingUtils.sanitizeDataStreamNamespace(v)); + dataStream.putIfAbsent("dataset", "generic"); + dataStream.putIfAbsent("namespace", "default"); + } + + /* + * routing based on data_stream.* fields + * this part will be handled by document based routing in the future + * for example, by a routing pipeline that is attached to the logs-router-default data stream + */ + private String routeToDataStream(Map doc) { + @SuppressWarnings("unchecked") + Map dataStream = (Map) doc.getOrDefault("data_stream", Map.of()); + return "logs-" + dataStream.getOrDefault("dataset", "generic") + "-" + dataStream.getOrDefault("namespace", "default"); + } + + public void sendResponse(RestChannel channel, RestStatus status, Consumer builderConsumer) throws IOException { + try (XContentBuilder builder = channel.newBuilder()) { + builderConsumer.accept(builder); + channel.sendResponse(new RestResponse(status, builder)); + } + } + + private Map getMetadata(Map event) { + Object metadata = event.get("_metadata"); + if (metadata instanceof Map) { + @SuppressWarnings("unchecked") + Map metadataMap = (Map) metadata; + return metadataMap; + } + return Map.of(); + } + + public static void expandDots(Map doc) { + for (String key : new ArrayList<>(doc.keySet())) { + if (key.contains(".")) { + Object value = doc.remove(key); + addPath(doc, key, value); + } + } + } + + private static void addPath(Map doc, String path, Object value) { + Map parent = doc; + String[] pathElements = path.split("\\."); + for (int i = 0; i < pathElements.length - 1; i++) { + String pathElement = pathElements[i]; + if (parent.containsKey(pathElement) == false) { + parent.put(pathElement, new HashMap<>()); + } + Object potentialParent = parent.get(pathElement); + if (potentialParent instanceof Map) { + // as this is a json object, if it's a map, it's guaranteed to be a Map + // that's because there can't be non-string keys in json objects + @SuppressWarnings("unchecked") + Map mapParent = (Map) potentialParent; + parent = mapParent; + } else { + // conflict, put the dotted key back in + doc.put(path, value); + return; + } + } + parent.put(pathElements[pathElements.length - 1], value); + } +} diff --git a/server/src/test/java/org/elasticsearch/rest/action/logs/RestLogsActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/logs/RestLogsActionTests.java new file mode 100644 index 0000000000000..03576acfca1f9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/rest/action/logs/RestLogsActionTests.java @@ -0,0 +1,280 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.rest.action.logs; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.test.rest.RestActionTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; +import org.junit.Before; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.elasticsearch.action.DocWriteRequest.OpType.CREATE; +import static org.mockito.Mockito.when; + +public class RestLogsActionTests extends RestActionTestCase { + + @Before + public void setUpAction() { + controller().registerHandler(new RestLogsAction()); + } + + public void testIngestJsonLogs() { + RestRequest req = createLogsRequest("/_logs", Map.of("message", "Hello World"), Map.of("foo", "bar")); + + verifyingClient.setExecuteVerifier((BiFunction, BulkRequest, BulkResponse>) (actionType, request) -> { + assertEquals(2, request.requests().size()); + IndexRequest indexRequest = (IndexRequest) request.requests().get(0); + assertDataStreamFields("generic", "default", indexRequest); + assertEquals(CREATE, indexRequest.opType()); + assertEquals("Hello World", ((IndexRequest) request.requests().get(0)).sourceAsMap().get("message")); + assertEquals("bar", ((IndexRequest) request.requests().get(1)).sourceAsMap().get("foo")); + return Mockito.mock(BulkResponse.class); + }); + assertEquals(0, dispatchRequest(req).errors().get()); + } + + public void testObjectScalarClashWithMetadata() { + RestRequest req = createLogsRequest("/_logs", Map.of("_metadata", Map.of("foo", Map.of("bar", "baz"))), Map.of("foo", "bar")); + + verifyingClient.setExecuteVerifier((BiFunction, BulkRequest, BulkResponse>) (actionType, request) -> { + assertEquals(1, request.requests().size()); + Map source = ((IndexRequest) request.requests().get(0)).sourceAsMap(); + assertEquals("bar", source.get("foo")); + assertNull(source.get("foo.bar")); + return Mockito.mock(BulkResponse.class); + }); + assertEquals(0, dispatchRequest(req).errors().get()); + } + + public void testObjectScalarClashInDocument() { + RestRequest req = createLogsRequest("/_logs", Map.of("foo.bar", "baz", "foo", "bar")); + + verifyingClient.setExecuteVerifier((BiFunction, BulkRequest, BulkResponse>) (actionType, request) -> { + assertEquals(1, request.requests().size()); + Map source = ((IndexRequest) request.requests().get(0)).sourceAsMap(); + assertEquals("bar", source.get("foo")); + assertEquals("baz", source.get("foo.bar")); + return Mockito.mock(BulkResponse.class); + }); + assertEquals(0, dispatchRequest(req).errors().get()); + } + + public void testInvalidJson() { + RestRequest req = createLogsRequest("/_logs/foo", "{}\n{\"message\": \"missing end quote}"); + verifyingClient.setExecuteVerifier((BiFunction, BulkRequest, BulkResponse>) (actionType, request) -> { + assertEquals(2, request.requests().size()); + IndexRequest indexRequest = (IndexRequest) request.requests().get(1); + assertDataStreamFields("dlq", "default", indexRequest); + Map doc = ((IndexRequest) request.requests().get(1)).sourceAsMap(); + assertEquals("foo", getPath(doc, "event.original.data_stream.dataset")); + assertEquals("{\"message\": \"missing end quote}", getPath(doc, "event.original.message")); + assertEquals("json_e_o_f_exception", getPath(doc, "error.type")); + assertTrue(((String) getPath(doc, "error.message")).contains("{\"message\": \"missing end quote}")); + return Mockito.mock(BulkResponse.class); + }); + assertEquals(0, dispatchRequest(req).errors().get()); + } + + public void testMetadata() { + RestRequest req = createLogsRequest( + "/_logs", + Map.of("_metadata", Map.of("global", true)), + Map.of("_metadata", Map.of("local1", true)), + Map.of("foo", "bar"), + Map.of("_metadata", Map.of("local2", true)), + Map.of("foo", "bar") + ); + verifyingClient.setExecuteVerifier((BiFunction, BulkRequest, BulkResponse>) (actionType, request) -> { + assertEquals(2, request.requests().size()); + Map doc1 = ((IndexRequest) request.requests().get(0)).sourceAsMap(); + Map doc2 = ((IndexRequest) request.requests().get(1)).sourceAsMap(); + assertEquals(true, doc1.get("global")); + assertEquals(true, doc1.get("local1")); + assertEquals(true, doc1.get("global")); + assertEquals(true, doc2.get("local2")); + return Mockito.mock(BulkResponse.class); + }); + assertEquals(0, dispatchRequest(req).errors().get()); + } + + public void testMergeGlobalAndEventMetadata() { + RestRequest req = createLogsRequest("/_logs/foo", Map.of("data_stream.namespace", "bar")); + verifyingClient.setExecuteVerifier((BiFunction, BulkRequest, BulkResponse>) (actionType, request) -> { + assertEquals(1, request.requests().size()); + Map doc = ((IndexRequest) request.requests().get(0)).sourceAsMap(); + assertEquals("foo", getPath(doc, "data_stream.dataset")); + assertEquals("bar", getPath(doc, "data_stream.namespace")); + return Mockito.mock(BulkResponse.class); + }); + assertEquals(0, dispatchRequest(req).errors().get()); + } + + private void assertDataStreamFields(String dataset, String namespace, DocWriteRequest docWriteRequest) { + IndexRequest indexRequest = (IndexRequest) docWriteRequest; + assertEquals("logs-" + dataset + "-" + namespace, indexRequest.index()); + assertEquals(Map.of("type", "logs", "dataset", dataset, "namespace", namespace), indexRequest.sourceAsMap().get("data_stream")); + } + + public void testPathMetadata() { + RestRequest req = createLogsRequest("/_logs/foo/bar", Map.of("message", "Hello World")); + verifyingClient.setExecuteVerifier((BiFunction, BulkRequest, BulkResponse>) (actionType, request) -> { + assertEquals(1, request.requests().size()); + assertDataStreamFields("foo", "bar", request.requests().get(0)); + return Mockito.mock(BulkResponse.class); + }); + assertEquals(0, dispatchRequest(req).errors().get()); + } + + public void testSendFailedDocumentsToDlq() { + RestRequest req = createLogsRequest("/_logs/foo/bar", Map.of("message", "Hello World")); + AtomicBoolean firstRequest = new AtomicBoolean(true); + verifyingClient.setExecuteVerifier((BiFunction, BulkRequest, BulkResponse>) (actionType, request) -> { + if (firstRequest.get()) { + firstRequest.set(false); + assertEquals(1, request.requests().size()); + assertDataStreamFields("foo", "bar", request.requests().get(0)); + return createMockBulkFailureResponse(new MapperParsingException("bad foo")); + } else { + assertEquals(1, request.requests().size()); + IndexRequest indexRequest = (IndexRequest) request.requests().get(0); + assertDataStreamFields("dlq", "bar", indexRequest); + Map doc = indexRequest.sourceAsMap(); + assertEquals("mapper_parsing_exception", getPath(doc, "error.type")); + assertEquals("bad foo", getPath(doc, "error.message")); + assertEquals("logs", getPath(doc, "event.original.data_stream.type")); + assertEquals("foo", getPath(doc, "event.original.data_stream.dataset")); + assertEquals("bar", getPath(doc, "event.original.data_stream.namespace")); + return Mockito.mock(BulkResponse.class); + } + }); + assertEquals(0, dispatchRequest(req).errors().get()); + } + + public void testReturnErrorWhenRetryFails() { + RestRequest req = createLogsRequest("/_logs/foo", Map.of("message", "Hello World")); + verifyingClient.setExecuteVerifier( + (BiFunction, BulkRequest, BulkResponse>) (actionType, request) -> createMockBulkFailureResponse( + new EsRejectedExecutionException() + ) + ); + assertEquals(1, dispatchRequest(req).errors().get()); + } + + private BulkResponse createMockBulkFailureResponse(Exception exception) { + BulkResponse bulkResponse = Mockito.mock(BulkResponse.class); + when(bulkResponse.hasFailures()).thenReturn(true); + + BulkItemResponse bulkItemResponse = Mockito.mock(BulkItemResponse.class); + when(bulkItemResponse.getItemId()).thenReturn(0); + when(bulkItemResponse.isFailed()).thenReturn(true); + when(bulkResponse.getItems()).thenReturn(new BulkItemResponse[] { bulkItemResponse }); + + BulkItemResponse.Failure failure = Mockito.mock(BulkItemResponse.Failure.class); + when(failure.getCause()).thenReturn(exception); + when(failure.getStatus()).thenReturn(ExceptionsHelper.status(exception)); + when(bulkItemResponse.getFailure()).thenReturn(failure); + + return bulkResponse; + } + + public void testExpandDots() throws IOException { + List testScenarios = List.of(""" + {"foo.bar":"baz"} + {"foo":{"bar":"baz"}} + """, """ + {"foo":"bar","foo.bar":"baz"} + {"foo":"bar","foo.bar":"baz"} + """, """ + {"foo":{"bar":"baz"},"foo.baz":"qux"} + {"foo":{"baz":"qux","bar":"baz"}} + """); + for (String testScenario : testScenarios) { + String[] split = testScenario.split("\n"); + Map withExpandedDots = jsonToMap(split[0]); + RestLogsAction.expandDots(withExpandedDots); + assertEquals(jsonToMap(split[1]), withExpandedDots); + } + } + + @SuppressWarnings("unchecked") + private RestRequest createLogsRequest(String path, Map... ndJson) { + return createLogsRequest( + path, + Arrays.stream(ndJson).map(j -> (Map) j).map(this::json).collect(Collectors.joining(randomBoolean() ? "\n" : "\r\n")) + ); + } + + private RestRequest createLogsRequest(String path, String content) { + return createLogsRequest(path, content, Map.of()); + } + + private RestRequest createLogsRequest(String path, String content, Map params) { + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath(path) + .withParams(new HashMap<>(params)) + .withContent(BytesReference.fromByteBuffer(ByteBuffer.wrap(content.getBytes(UTF_8))), XContentType.JSON) + .build(); + } + + private String json(Map map) { + try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON)) { + builder.map(map); + return XContentHelper.convertToJson(BytesReference.bytes(builder), false, XContentType.JSON); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private Map jsonToMap(String json) throws IOException { + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json)) { + return parser.map(); + } + } + + @SuppressWarnings("unchecked") + private static T getPath(Map doc, String path) { + Map parent = doc; + String[] pathElements = path.split("\\."); + for (int i = 0; i < pathElements.length - 1; i++) { + parent = (Map) parent.get(pathElements[i]); + if (parent == null) { + return null; + } + } + return (T) parent.get(pathElements[pathElements.length - 1]); + } + +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 297ea9beafff1..eaa2c87d54089 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -1830,6 +1830,8 @@ protected static boolean isXPackTemplate(String name) { case "logs": case "logs-settings": case "logs-mappings": + case "logs-dlq": + case "logs-dlq-mappings": case "metrics": case "metrics-settings": case "metrics-mappings": diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestChannel.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestChannel.java index d87be6e716901..8537c033b992e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestChannel.java @@ -10,7 +10,6 @@ import org.elasticsearch.rest.AbstractRestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; -import org.elasticsearch.rest.RestStatus; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -29,7 +28,7 @@ public FakeRestChannel(RestRequest request, boolean detailedErrorsEnabled, int r @Override public void sendResponse(RestResponse response) { this.capturedRestResponse = response; - if (response.status() == RestStatus.OK) { + if (response.status().getStatus() < 400) { responses.incrementAndGet(); } else { errors.incrementAndGet(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/RestActionTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/RestActionTestCase.java index 58f08d75423c6..7153670fb33fa 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/RestActionTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/RestActionTestCase.java @@ -66,12 +66,13 @@ protected RestController controller() { /** * Sends the given request to the test controller in {@link #controller()}. */ - protected void dispatchRequest(RestRequest request) { + protected FakeRestChannel dispatchRequest(RestRequest request) { FakeRestChannel channel = new FakeRestChannel(request, false, 1); ThreadContext threadContext = verifyingClient.threadPool().getThreadContext(); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { controller.dispatchRequest(request, channel, threadContext); } + return channel; } /** @@ -110,7 +111,7 @@ public void reset() { * function should return a subclass of {@link ActionResponse} that is appropriate for the action. * @param verifier A function which is called in place of {@link #doExecute(ActionType, ActionRequest, ActionListener)} */ - public void setExecuteVerifier(BiFunction, ActionRequest, R> verifier) { + public void setExecuteVerifier(BiFunction, ? extends ActionRequest, R> verifier) { /* * Perform a little generics dance to force the callers to mock * a return type appropriate for the action even though we can't diff --git a/x-pack/plugin/core/src/main/resources/logs-dlq-mappings.json b/x-pack/plugin/core/src/main/resources/logs-dlq-mappings.json new file mode 100644 index 0000000000000..0442f0196e109 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/logs-dlq-mappings.json @@ -0,0 +1,33 @@ +{ + "template": { + "mappings": { + "dynamic": "false", + "properties": { + "@timestamp": { + "type": "date", + "ignore_malformed": false + }, + "data_stream": { + "properties": { + "type": { + "type": "constant_keyword", + "value": "logs" + }, + "dataset": { + "type": "constant_keyword", + "value": "dlq" + }, + "namespace": { + "type": "constant_keyword" + } + } + } + } + } + }, + "_meta": { + "description": "default mappings for the logs-dlq-* index template installed by x-pack", + "managed": true + }, + "version": ${xpack.stack.template.version} +} diff --git a/x-pack/plugin/core/src/main/resources/logs-dlq-template.json b/x-pack/plugin/core/src/main/resources/logs-dlq-template.json new file mode 100644 index 0000000000000..71177faa3ef23 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/logs-dlq-template.json @@ -0,0 +1,15 @@ +{ + "index_patterns": ["logs-dlq-*"], + "priority": 200, + "data_stream": {}, + "composed_of": [ + "logs-dlq-mappings", + "logs-settings" + ], + "allow_auto_create": true, + "_meta": { + "description": "default logs-dlq-* template installed by x-pack", + "managed": true + }, + "version": ${xpack.stack.template.version} +} diff --git a/x-pack/plugin/core/src/main/resources/logs-mappings.json b/x-pack/plugin/core/src/main/resources/logs-mappings.json index 1851f36947cc8..f77563407a55f 100644 --- a/x-pack/plugin/core/src/main/resources/logs-mappings.json +++ b/x-pack/plugin/core/src/main/resources/logs-mappings.json @@ -1,14 +1,312 @@ { "template": { "mappings": { + "dynamic_templates": [ + { + "match_ip": { + "match_mapping_type": "string", + "match": "ip", + "runtime": { + "type": "ip" + } + } + }, + { + "message": { + "path_match": "message", + "mapping": { + "type": "match_only_text" + } + } + }, + { + "log_level": { + "path_match": "log.level", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "log_logger": { + "path_match": "log.logger", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "trace_id": { + "path_match": "trace.id", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "span_id": { + "path_match": "span.id", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "transaction_id": { + "path_match": "transaction.id", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "service_name": { + "path_match": "service.name", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "service_version": { + "path_match": "service.version", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "service_environment": { + "path_match": "service.environment", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "process_pid": { + "path_match": "process.pid", + "mapping": { + "type": "long" + } + } + }, + { + "process_thread_name": { + "path_match": "process.thread.name", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "error_type": { + "path_match": "error.type", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "error_message": { + "path_match": "error.message", + "mapping": { + "type": "match_only_text" + } + } + }, + { + "event_dataset": { + "path_match": "event.dataset", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "cloud_provider": { + "path_match": "cloud.provider", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "cloud_availability_zone": { + "path_match": "cloud.availability_zone", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "cloud_region": { + "path_match": "cloud.region", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "host_hostname": { + "path_match": "host.hostname", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "host_name": { + "path_match": "host.name", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "container_id": { + "path_match": "container.id", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "container_name": { + "path_match": "container.name", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "orchestrator_namespace": { + "path_match": "orchestrator.namespace", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "orchestrator_id": { + "path_match": "orchestrator.cluster.id", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "orchestrator_id": { + "path_match": "orchestrator.cluster.name", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "orchestrator_resource_id": { + "path_match": "orchestrator.resource.id", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + { + "orchestrator_resource_name": { + "path_match": "orchestrator.resource.name", + "mapping": { + "type": "keyword", + "ignore_above": 1024 + } + } + } + ], + "dynamic": "runtime", + "date_detection": false, "properties": { + "@timestamp": { + "type": "date", + "ignore_malformed": false + }, "data_stream": { "properties": { "type": { "type": "constant_keyword", "value": "logs" + }, + "dataset": { + "type": "constant_keyword" + }, + "namespace": { + "type": "constant_keyword" + } + } + }, + "log": { + "type": "object" + }, + "error": { + "type": "object" + }, + "event": { + "type": "object" + }, + "trace": { + "type": "object" + }, + "transaction": { + "type": "object" + }, + "span": { + "type": "object" + }, + "cloud": { + "type": "object" + }, + "container": { + "type": "object" + }, + "host": { + "type": "object" + }, + "orchestrator": { + "properties": { + "resource": { + "type": "object" + } + } + }, + "process": { + "properties": { + "thread": { + "type": "object" } } + }, + "service": { + "type": "object" } } } diff --git a/x-pack/plugin/core/src/main/resources/logs-settings.json b/x-pack/plugin/core/src/main/resources/logs-settings.json index 7370e40accc79..134100b73398c 100644 --- a/x-pack/plugin/core/src/main/resources/logs-settings.json +++ b/x-pack/plugin/core/src/main/resources/logs-settings.json @@ -8,6 +8,9 @@ "codec": "best_compression", "query": { "default_field": ["message"] + }, + "mapping": { + "ignore_malformed": true } } } diff --git a/x-pack/plugin/core/src/main/resources/logs-template.json b/x-pack/plugin/core/src/main/resources/logs-template.json index 198e0d04ab5af..9c21a2b6ae504 100644 --- a/x-pack/plugin/core/src/main/resources/logs-template.json +++ b/x-pack/plugin/core/src/main/resources/logs-template.json @@ -4,7 +4,6 @@ "data_stream": {}, "composed_of": [ "logs-mappings", - "data-streams-mappings", "logs-settings" ], "allow_auto_create": true, diff --git a/x-pack/plugin/stack/qa/rest/build.gradle b/x-pack/plugin/stack/qa/rest/build.gradle index 770a52fde68be..4903c2dcf0c5b 100644 --- a/x-pack/plugin/stack/qa/rest/build.gradle +++ b/x-pack/plugin/stack/qa/rest/build.gradle @@ -8,7 +8,7 @@ dependencies { restResources { restApi { - include '_common', 'cluster', 'indices', 'index', 'snapshot', 'ilm', 'slm', 'stack', 'indices' + include '_common', 'cluster', 'indices', 'index', 'snapshot', 'ilm', 'slm', 'stack', 'indices', 'search' } } @@ -17,3 +17,7 @@ testClusters.configureEach { setting 'xpack.ml.enabled', 'false' setting 'xpack.license.self_generated.type', 'trial' } + +tasks.named("yamlRestTestV7CompatTransform").configure { task -> + task.skipTest("stack/10_basic/Test logs index auto creation", "Strict assertions on the mapping of the default logs-*-* index template prevents making any improvements in a non-breaking way.") +} diff --git a/x-pack/plugin/stack/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/stack/10_basic.yml b/x-pack/plugin/stack/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/stack/10_basic.yml index 4c2d570b2f70b..1df4d61b9933f 100644 --- a/x-pack/plugin/stack/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/stack/10_basic.yml +++ b/x-pack/plugin/stack/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/stack/10_basic.yml @@ -47,6 +47,7 @@ setup: - do: index: index: logs-foo-bar + refresh: true body: "@timestamp": "2020-01-01" message: "test-log-message" @@ -83,10 +84,29 @@ setup: - match: { .$idx0name.mappings.properties.data_stream.properties.namespace.value: "bar" } - is_true: .$idx0name.mappings.properties.message - match: { .$idx0name.mappings.properties.message.type: "match_only_text" } - - match: { .$idx0name.mappings.properties.source.properties.ip.type: "ip" } - - match: { .$idx0name.mappings.properties.log.properties.file.properties.path.type: "keyword" } - match: { .$idx0name.data_stream: "logs-foo-bar" } + - do: + search: + index: logs-foo-bar + body: + query: + bool: + must: + - term: + log.file.path: "/var/log/web/access.log" + - term: + source.ip: "10.1.2.3" + - length: { hits.hits: 1 } + - match: + hits.hits.0._source: + "@timestamp": "2020-01-01" + message: "test-log-message" + source.ip: "10.1.2.3" + log.file.path: "/var/log/web/access.log" + data_stream.type: "logs" + data_stream.dataset: "foo" + data_stream.namespace: "bar" - do: indices.delete_data_stream: name: logs-foo-bar diff --git a/x-pack/plugin/stack/src/main/java/org/elasticsearch/xpack/stack/StackTemplateRegistry.java b/x-pack/plugin/stack/src/main/java/org/elasticsearch/xpack/stack/StackTemplateRegistry.java index c03e213c7fdbd..a42c3b9e9393a 100644 --- a/x-pack/plugin/stack/src/main/java/org/elasticsearch/xpack/stack/StackTemplateRegistry.java +++ b/x-pack/plugin/stack/src/main/java/org/elasticsearch/xpack/stack/StackTemplateRegistry.java @@ -66,9 +66,11 @@ public class StackTemplateRegistry extends IndexTemplateRegistry { // Logs components (for matching logs-*-* indices) ////////////////////////////////////////////////////////// public static final String LOGS_MAPPINGS_COMPONENT_TEMPLATE_NAME = "logs-mappings"; + public static final String LOGS_DLQ_MAPPINGS_COMPONENT_TEMPLATE_NAME = "logs-dlq-mappings"; public static final String LOGS_SETTINGS_COMPONENT_TEMPLATE_NAME = "logs-settings"; public static final String LOGS_ILM_POLICY_NAME = "logs"; public static final String LOGS_INDEX_TEMPLATE_NAME = "logs"; + public static final String LOGS_DLQ_INDEX_TEMPLATE_NAME = "logs-dlq"; ////////////////////////////////////////////////////////// // Metrics components (for matching metric-*-* indices) @@ -154,6 +156,12 @@ protected List getPolicyConfigs() { REGISTRY_VERSION, TEMPLATE_VERSION_VARIABLE ), + new IndexTemplateConfig( + LOGS_DLQ_MAPPINGS_COMPONENT_TEMPLATE_NAME, + "/logs-dlq-mappings.json", + REGISTRY_VERSION, + TEMPLATE_VERSION_VARIABLE + ), new IndexTemplateConfig( LOGS_SETTINGS_COMPONENT_TEMPLATE_NAME, "/logs-settings.json", @@ -208,6 +216,7 @@ protected Map getComponentTemplateConfigs() { private static final Map COMPOSABLE_INDEX_TEMPLATE_CONFIGS = parseComposableTemplates( new IndexTemplateConfig(LOGS_INDEX_TEMPLATE_NAME, "/logs-template.json", REGISTRY_VERSION, TEMPLATE_VERSION_VARIABLE), + new IndexTemplateConfig(LOGS_DLQ_INDEX_TEMPLATE_NAME, "/logs-dlq-template.json", REGISTRY_VERSION, TEMPLATE_VERSION_VARIABLE), new IndexTemplateConfig(METRICS_INDEX_TEMPLATE_NAME, "/metrics-template.json", REGISTRY_VERSION, TEMPLATE_VERSION_VARIABLE), new IndexTemplateConfig(SYNTHETICS_INDEX_TEMPLATE_NAME, "/synthetics-template.json", REGISTRY_VERSION, TEMPLATE_VERSION_VARIABLE) ); diff --git a/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/StackTemplateRegistryTests.java b/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/StackTemplateRegistryTests.java index f3d41c3d96e44..597ca54e8e569 100644 --- a/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/StackTemplateRegistryTests.java +++ b/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/StackTemplateRegistryTests.java @@ -279,6 +279,8 @@ public void testSameOrHigherVersionTemplateNotUpgraded() { versions.put(StackTemplateRegistry.DATA_STREAMS_MAPPINGS_COMPONENT_TEMPLATE_NAME, StackTemplateRegistry.REGISTRY_VERSION); versions.put(StackTemplateRegistry.LOGS_SETTINGS_COMPONENT_TEMPLATE_NAME, StackTemplateRegistry.REGISTRY_VERSION); versions.put(StackTemplateRegistry.LOGS_MAPPINGS_COMPONENT_TEMPLATE_NAME, StackTemplateRegistry.REGISTRY_VERSION); + versions.put(StackTemplateRegistry.LOGS_DLQ_INDEX_TEMPLATE_NAME, StackTemplateRegistry.REGISTRY_VERSION); + versions.put(StackTemplateRegistry.LOGS_DLQ_MAPPINGS_COMPONENT_TEMPLATE_NAME, StackTemplateRegistry.REGISTRY_VERSION); versions.put(StackTemplateRegistry.METRICS_SETTINGS_COMPONENT_TEMPLATE_NAME, StackTemplateRegistry.REGISTRY_VERSION); versions.put(StackTemplateRegistry.METRICS_MAPPINGS_COMPONENT_TEMPLATE_NAME, StackTemplateRegistry.REGISTRY_VERSION); versions.put(StackTemplateRegistry.SYNTHETICS_SETTINGS_COMPONENT_TEMPLATE_NAME, StackTemplateRegistry.REGISTRY_VERSION); @@ -314,6 +316,14 @@ public void testSameOrHigherVersionTemplateNotUpgraded() { StackTemplateRegistry.LOGS_MAPPINGS_COMPONENT_TEMPLATE_NAME, StackTemplateRegistry.REGISTRY_VERSION + randomIntBetween(1, 1000) ); + versions.put( + StackTemplateRegistry.LOGS_DLQ_MAPPINGS_COMPONENT_TEMPLATE_NAME, + StackTemplateRegistry.REGISTRY_VERSION + randomIntBetween(1, 1000) + ); + versions.put( + StackTemplateRegistry.LOGS_DLQ_INDEX_TEMPLATE_NAME, + StackTemplateRegistry.REGISTRY_VERSION + randomIntBetween(1, 1000) + ); versions.put( StackTemplateRegistry.METRICS_SETTINGS_COMPONENT_TEMPLATE_NAME, StackTemplateRegistry.REGISTRY_VERSION + randomIntBetween(1, 1000)