From 91d8ada4695f8395250eaae625f8ce7dfe57492d Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 13 Nov 2019 11:00:24 +0100 Subject: [PATCH 1/6] Add templating support to pipeline processor. This commit adds templating support to the pipeline processor's `name` option. Closes #39955 --- .../ingest/processors/pipeline.asciidoc | 2 +- modules/ingest-common/build.gradle | 7 +- .../test/ingest/210_pipeline_processor.yml | 94 +++++++++++++++++++ .../elasticsearch/ingest/IngestService.java | 4 +- .../ingest/PipelineProcessor.java | 20 ++-- .../ingest/TrackingResultProcessor.java | 4 +- .../ingest/IngestServiceTests.java | 2 +- .../ingest/PipelineProcessorTests.java | 18 +++- 8 files changed, 131 insertions(+), 20 deletions(-) diff --git a/docs/reference/ingest/processors/pipeline.asciidoc b/docs/reference/ingest/processors/pipeline.asciidoc index 573ab3b88d3c4..7f1ea2885e69a 100644 --- a/docs/reference/ingest/processors/pipeline.asciidoc +++ b/docs/reference/ingest/processors/pipeline.asciidoc @@ -7,7 +7,7 @@ Executes another pipeline. [options="header"] |====== | Name | Required | Default | Description -| `name` | yes | - | The name of the pipeline to execute +| `name` | yes | - | The name of the pipeline to execute. Supports <>. include::common-options.asciidoc[] |====== diff --git a/modules/ingest-common/build.gradle b/modules/ingest-common/build.gradle index 00c444e50e87d..5b531f381178f 100644 --- a/modules/ingest-common/build.gradle +++ b/modules/ingest-common/build.gradle @@ -27,4 +27,9 @@ dependencies { compileOnly project(':modules:lang-painless') compile project(':libs:elasticsearch-grok') compile project(':libs:elasticsearch-dissect') -} \ No newline at end of file +} + +testClusters.integTest { + // Needed in order to test ingest pipeline templating: + module file(project(':modules:lang-mustache').tasks.bundlePlugin.archiveFile) +} diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index e375d195bfbc9..ad12d2f4ada6b 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -108,3 +108,97 @@ teardown: body: {} - match: { error.root_cause.0.type: "exception" } - match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" } + +--- +"Test Pipeline Processor with templating": + - do: + ingest.put_pipeline: + id: "engineering-department" + body: > + { + "processors" : [ + { + "set" : { + "field": "manager", + "value": "john" + } + } + ] + } + - match: { acknowledged: true } + + - do: + ingest.put_pipeline: + id: "sales-department" + body: > + { + "processors" : [ + { + "set" : { + "field": "manager", + "value": "jan" + } + } + ] + } + - match: { acknowledged: true } + + - do: + ingest.put_pipeline: + id: "outer" + body: > + { + "processors" : [ + { + "pipeline" : { + "name": "{{org}}-department" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "outer" + body: > + { + "org": "engineering" + } + + - do: + get: + index: test + id: 1 + - match: { _source.manager: "john" } + + - do: + index: + index: test + id: 2 + pipeline: "outer" + body: > + { + "org": "sales" + } + + - do: + get: + index: test + id: 2 + - match: { _source.manager: "jan" } + + - do: + catch: /illegal_state_exception/ + index: + index: test + id: 3 + pipeline: "outer" + body: > + { + "org": "legal" + } + - match: { error.root_cause.0.type: "exception" } + - match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Pipeline processor configured for non-existent pipeline [legal-department]" } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index b17b530aca9f0..2574a76a64eca 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -431,7 +431,7 @@ public void addIngestClusterStateListener(Consumer listener) { } //package private for testing - static String getProcessorName(Processor processor){ + static String getProcessorName(Processor processor) { // conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name if(processor instanceof ConditionalProcessor){ processor = ((ConditionalProcessor) processor).getInnerProcessor(); @@ -440,7 +440,7 @@ static String getProcessorName(Processor processor){ sb.append(processor.getType()); if(processor instanceof PipelineProcessor){ - String pipelineName = ((PipelineProcessor) processor).getPipelineName(); + String pipelineName = ((PipelineProcessor) processor).getPipelineName().newInstance(Map.of()).execute(); sb.append(":"); sb.append(pipelineName); } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index f5e37a1c1235e..634de19695ee0 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -19,6 +19,8 @@ package org.elasticsearch.ingest; +import org.elasticsearch.script.TemplateScript; + import java.util.Map; import java.util.function.BiConsumer; @@ -26,11 +28,10 @@ public class PipelineProcessor extends AbstractProcessor { public static final String TYPE = "pipeline"; - private final String pipelineName; - + private final TemplateScript.Factory pipelineName; private final IngestService ingestService; - private PipelineProcessor(String tag, String pipelineName, IngestService ingestService) { + private PipelineProcessor(String tag, TemplateScript.Factory pipelineName, IngestService ingestService) { super(tag); this.pipelineName = pipelineName; this.ingestService = ingestService; @@ -38,6 +39,7 @@ private PipelineProcessor(String tag, String pipelineName, IngestService ingestS @Override public void execute(IngestDocument ingestDocument, BiConsumer handler) { + String pipelineName = ingestDocument.renderTemplate(this.pipelineName); Pipeline pipeline = ingestService.getPipeline(pipelineName); if (pipeline != null) { ingestDocument.executePipeline(pipeline, handler); @@ -52,7 +54,8 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { throw new UnsupportedOperationException("this method should not get executed"); } - Pipeline getPipeline(){ + Pipeline getPipeline(IngestDocument ingestDocument) { + String pipelineName = ingestDocument.renderTemplate(this.pipelineName); return ingestService.getPipeline(pipelineName); } @@ -61,7 +64,7 @@ public String getType() { return TYPE; } - String getPipelineName() { + TemplateScript.Factory getPipelineName() { return pipelineName; } @@ -76,9 +79,10 @@ public Factory(IngestService ingestService) { @Override public PipelineProcessor create(Map registry, String processorTag, Map config) throws Exception { - String pipeline = - ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "name"); - return new PipelineProcessor(processorTag, pipeline, ingestService); + String pipelineNameValue = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "name"); + TemplateScript.Factory pipelineName = + ConfigurationUtils.compileTemplate(TYPE, processorTag, "name", pipelineNameValue, ingestService.getScriptService()); + return new PipelineProcessor(processorTag, pipelineName, ingestService); } } } diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index edd236c8c4e76..4abaadb353c55 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -45,10 +45,10 @@ public final class TrackingResultProcessor implements Processor { public void execute(IngestDocument ingestDocument, BiConsumer handler) { if (actualProcessor instanceof PipelineProcessor) { PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor); - Pipeline pipeline = pipelineProcessor.getPipeline(); + Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument); //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument); - ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(), (result, e) -> { + ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(ingestDocument), (result, e) -> { // do nothing, let the tracking processors throw the exception while recording the path up to the failure if (e instanceof ElasticsearchException) { ElasticsearchException elasticsearchException = (ElasticsearchException) e; diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index b3db2300bb1f4..cdc4422baf51e 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -1109,7 +1109,7 @@ public void testStatName(){ PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class); String pipelineName = randomAlphaOfLength(10); - when(pipelineProcessor.getPipelineName()).thenReturn(pipelineName); + when(pipelineProcessor.getPipelineName()).thenReturn(new TestTemplateService.MockTemplateScript.Factory(pipelineName)); name = PipelineProcessor.TYPE; when(pipelineProcessor.getType()).thenReturn(name); assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName)); diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 4f36727c7ac30..e43cb5c701b34 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; import java.util.Arrays; @@ -37,7 +38,7 @@ public class PipelineProcessorTests extends ESTestCase { public void testExecutesPipeline() throws Exception { String pipelineId = "pipeline"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); CompletableFuture invoked = new CompletableFuture<>(); IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); Pipeline pipeline = new Pipeline( @@ -69,7 +70,7 @@ public String getTag() { } public void testThrowsOnMissingPipeline() throws Exception { - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); Map config = new HashMap<>(); @@ -85,7 +86,7 @@ public void testThrowsOnMissingPipeline() throws Exception { public void testThrowsOnRecursivePipelineInvocations() throws Exception { String innerPipelineId = "inner"; String outerPipelineId = "outer"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); Map outerConfig = new HashMap<>(); outerConfig.put("name", innerPipelineId); @@ -113,7 +114,7 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception { public void testAllowsRepeatedPipelineInvocations() throws Exception { String innerPipelineId = "inner"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); Map outerConfig = new HashMap<>(); outerConfig.put("name", innerPipelineId); @@ -131,7 +132,7 @@ public void testPipelineProcessorWithPipelineChain() throws Exception { String pipeline1Id = "pipeline1"; String pipeline2Id = "pipeline2"; String pipeline3Id = "pipeline3"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); Map pipeline1ProcessorConfig = new HashMap<>(); @@ -203,4 +204,11 @@ pipeline3Id, null, null, new CompoundProcessor( assertThat(pipeline2Stats.getIngestFailedCount(), equalTo(0L)); assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L)); } + + private static IngestService createIngestService() { + IngestService ingestService = mock(IngestService.class); + ScriptService scriptService = mock(ScriptService.class); + when(ingestService.getScriptService()).thenReturn(scriptService); + return ingestService; + } } From bb8f59c1902d94521fba1d04a9358ac0c8ea33bb Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 13 Nov 2019 16:22:21 +0100 Subject: [PATCH 2/6] fixed tests --- .../ingest/PipelineProcessorTests.java | 2 +- .../ingest/TrackingResultProcessorTests.java | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index e43cb5c701b34..aebcc28e77d5e 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -205,7 +205,7 @@ pipeline3Id, null, null, new CompoundProcessor( assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L)); } - private static IngestService createIngestService() { + static IngestService createIngestService() { IngestService ingestService = mock(IngestService.class); ScriptService scriptService = mock(ScriptService.class); when(ingestService.getScriptService()).thenReturn(scriptService); diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index 5a840b9e4bb48..00cfed6b5d567 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -41,6 +41,7 @@ import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD; +import static org.elasticsearch.ingest.PipelineProcessorTests.createIngestService; import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; @@ -196,7 +197,7 @@ public void testActualCompoundProcessorWithFalseConditional() throws Exception { public void testActualPipelineProcessor() throws Exception { String pipelineId = "pipeline1"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); Map pipelineConfig = new HashMap<>(); pipelineConfig.put("name", pipelineId); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); @@ -241,7 +242,7 @@ pipelineId, null, null, new CompoundProcessor( public void testActualPipelineProcessorWithTrueConditional() throws Exception { String pipelineId1 = "pipeline1"; String pipelineId2 = "pipeline2"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); Map pipelineConfig0 = new HashMap<>(); pipelineConfig0.put("name", pipelineId1); Map pipelineConfig1 = new HashMap<>(); @@ -309,7 +310,7 @@ pipelineId2, null, null, new CompoundProcessor( public void testActualPipelineProcessorWithFalseConditional() throws Exception { String pipelineId1 = "pipeline1"; String pipelineId2 = "pipeline2"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); Map pipelineConfig0 = new HashMap<>(); pipelineConfig0.put("name", pipelineId1); Map pipelineConfig1 = new HashMap<>(); @@ -378,7 +379,7 @@ public void testActualPipelineProcessorWithHandledFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); String pipelineId = "pipeline1"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); Map pipelineConfig = new HashMap<>(); pipelineConfig.put("name", pipelineId); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); @@ -431,7 +432,7 @@ pipelineId, null, null, new CompoundProcessor( public void testActualPipelineProcessorWithCycle() throws Exception { String pipelineId1 = "pipeline1"; String pipelineId2 = "pipeline2"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); Map pipelineConfig0 = new HashMap<>(); pipelineConfig0.put("name", pipelineId1); Map pipelineConfig1 = new HashMap<>(); @@ -463,7 +464,7 @@ public void testActualPipelineProcessorWithCycle() throws Exception { public void testActualPipelineProcessorRepeatedInvocation() throws Exception { String pipelineId = "pipeline1"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); Map pipelineConfig = new HashMap<>(); pipelineConfig.put("name", pipelineId); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); From f02e538a17ddd0a21b68c5a23e536eb9ee578392 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 13 Nov 2019 17:42:58 +0100 Subject: [PATCH 3/6] fixed checkstyle violation --- .../org/elasticsearch/ingest/TrackingResultProcessorTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index 00cfed6b5d567..cb5890e53ec53 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -49,7 +49,6 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; From bc57fb1003ac3eb8258e5887f893576470b2ee16 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 27 Nov 2019 09:03:13 +0100 Subject: [PATCH 4/6] iter --- modules/ingest-common/build.gradle | 1 + .../main/java/org/elasticsearch/ingest/PipelineProcessor.java | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/ingest-common/build.gradle b/modules/ingest-common/build.gradle index efe72087440a7..6590bdc1c52ef 100644 --- a/modules/ingest-common/build.gradle +++ b/modules/ingest-common/build.gradle @@ -31,5 +31,6 @@ dependencies { testClusters.integTest { // Needed in order to test ingest pipeline templating: + // (this is because the integTest node is not using default distribution, but only the minimal number of required modules) module file(project(':modules:lang-mustache').tasks.bundlePlugin.archiveFile) } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index 634de19695ee0..4410449ad7ca2 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -79,9 +79,8 @@ public Factory(IngestService ingestService) { @Override public PipelineProcessor create(Map registry, String processorTag, Map config) throws Exception { - String pipelineNameValue = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "name"); TemplateScript.Factory pipelineName = - ConfigurationUtils.compileTemplate(TYPE, processorTag, "name", pipelineNameValue, ingestService.getScriptService()); + ConfigurationUtils.readTemplateProperty(TYPE, processorTag, config, "name", ingestService.getScriptService()); return new PipelineProcessor(processorTag, pipelineName, ingestService); } } From 17987ca4f3c4a6e3407eb384c8684e9acbd6b89e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 27 Nov 2019 10:40:56 +0100 Subject: [PATCH 5/6] fixed test --- .../rest-api-spec/test/ingest/210_pipeline_processor.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index d007125f673b8..76dbc180fa0e5 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -200,5 +200,5 @@ teardown: { "org": "legal" } - - match: { error.root_cause.0.type: "exception" } + - match: { error.root_cause.0.type: "ingest_processor_exception" } - match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Pipeline processor configured for non-existent pipeline [legal-department]" } From eace6523f40c9e3322a1e2114d89d9d8346a66c6 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 27 Nov 2019 12:10:09 +0100 Subject: [PATCH 6/6] rename --- .../elasticsearch/ingest/IngestService.java | 2 +- .../ingest/PipelineProcessor.java | 18 +++++++++--------- .../ingest/IngestServiceTests.java | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 8898a690c06d4..56b899f068b1c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -469,7 +469,7 @@ static String getProcessorName(Processor processor) { sb.append(processor.getType()); if(processor instanceof PipelineProcessor){ - String pipelineName = ((PipelineProcessor) processor).getPipelineName().newInstance(Map.of()).execute(); + String pipelineName = ((PipelineProcessor) processor).getPipelineTemplate().newInstance(Map.of()).execute(); sb.append(":"); sb.append(pipelineName); } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index 4410449ad7ca2..be02fe24752c1 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -28,18 +28,18 @@ public class PipelineProcessor extends AbstractProcessor { public static final String TYPE = "pipeline"; - private final TemplateScript.Factory pipelineName; + private final TemplateScript.Factory pipelineTemplate; private final IngestService ingestService; - private PipelineProcessor(String tag, TemplateScript.Factory pipelineName, IngestService ingestService) { + private PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) { super(tag); - this.pipelineName = pipelineName; + this.pipelineTemplate = pipelineTemplate; this.ingestService = ingestService; } @Override public void execute(IngestDocument ingestDocument, BiConsumer handler) { - String pipelineName = ingestDocument.renderTemplate(this.pipelineName); + String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate); Pipeline pipeline = ingestService.getPipeline(pipelineName); if (pipeline != null) { ingestDocument.executePipeline(pipeline, handler); @@ -55,7 +55,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { } Pipeline getPipeline(IngestDocument ingestDocument) { - String pipelineName = ingestDocument.renderTemplate(this.pipelineName); + String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate); return ingestService.getPipeline(pipelineName); } @@ -64,8 +64,8 @@ public String getType() { return TYPE; } - TemplateScript.Factory getPipelineName() { - return pipelineName; + TemplateScript.Factory getPipelineTemplate() { + return pipelineTemplate; } public static final class Factory implements Processor.Factory { @@ -79,9 +79,9 @@ public Factory(IngestService ingestService) { @Override public PipelineProcessor create(Map registry, String processorTag, Map config) throws Exception { - TemplateScript.Factory pipelineName = + TemplateScript.Factory pipelineTemplate = ConfigurationUtils.readTemplateProperty(TYPE, processorTag, config, "name", ingestService.getScriptService()); - return new PipelineProcessor(processorTag, pipelineName, ingestService); + return new PipelineProcessor(processorTag, pipelineTemplate, ingestService); } } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 1681f97f8d416..5400956d076c3 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -1115,7 +1115,7 @@ public void testStatName(){ PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class); String pipelineName = randomAlphaOfLength(10); - when(pipelineProcessor.getPipelineName()).thenReturn(new TestTemplateService.MockTemplateScript.Factory(pipelineName)); + when(pipelineProcessor.getPipelineTemplate()).thenReturn(new TestTemplateService.MockTemplateScript.Factory(pipelineName)); name = PipelineProcessor.TYPE; when(pipelineProcessor.getType()).thenReturn(name); assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName));