Skip to content

Commit

Permalink
[7.x] Fix ingest simulate verbose on failure with conditional (#56478) (
Browse files Browse the repository at this point in the history
#56635)

If a conditional is added to a processor, and that processor fails, and 
that processor has an on_failure handler, the full trace of all of the 
executed processors may not be displayed in simulate verbose. The 
information is correct, but misses displaying some of the steps used 
to get there.

This happens because a processor that is conditional processor is a 
wrapper around the real processor and a processor with an on_failure 
handler is also a wrapper around the processor(s). When decorating for 
simulation we treat compound processor specially, but if a compound processor
is wrapped by a conditional processor that compound processor's processors 
can be missed for decoration resulting in the missing displayed steps.

The fix to this is to treat the conditional processor specially and
explicitly seperate it from the processor it is wrapping. This requires
us to keep track of 2 processors a possible conditional processor and
the actual processor it may be wrapping.

related: #56004
  • Loading branch information
jakelandis authored May 12, 2020
1 parent cf76a93 commit a56fb61
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -758,3 +758,73 @@ teardown:
- match: { docs.0.processor_results.2.doc._source.pipeline1: true }
- match: { docs.0.processor_results.2.doc._source.pipeline2: true }

---
"Test verbose simulate with true conditional and on failure":
- do:
ingest.simulate:
verbose: true
body: >
{
"pipeline": {
"processors": [
{
"rename": {
"tag": "gunna_fail",
"if": "true",
"field": "foo1",
"target_field": "fieldA",
"on_failure": [
{
"set": {
"field": "failed1",
"value": "failed1",
"tag": "failed1"
}
},
{
"rename": {
"tag": "gunna_fail_again",
"if": "true",
"field": "foo2",
"target_field": "fieldA",
"on_failure": [
{
"set": {
"field": "failed2",
"value": "failed2",
"tag": "failed2"
}
}
]
}
}
]
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- length: { docs: 1 }
- length: { docs.0.processor_results: 4 }
- match: { docs.0.processor_results.0.tag: "gunna_fail" }
- match: { docs.0.processor_results.0.error.reason: "field [foo1] doesn't exist" }
- match: { docs.0.processor_results.1.tag: "failed1" }
- match: { docs.0.processor_results.1.doc._source.failed1: "failed1" }
- match: { docs.0.processor_results.1.doc._ingest.on_failure_processor_tag: "gunna_fail" }
- match: { docs.0.processor_results.2.tag: "gunna_fail_again" }
- match: { docs.0.processor_results.2.error.reason: "field [foo2] doesn't exist" }
- match: { docs.0.processor_results.3.tag: "failed2" }
- match: { docs.0.processor_results.3.doc._source.failed1: "failed1" }
- match: { docs.0.processor_results.3.doc._source.failed2: "failed2" }
- match: { docs.0.processor_results.3.doc._ingest.on_failure_processor_tag: "gunna_fail_again" }


Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean v
BiConsumer<SimulateDocumentResult, Exception> handler) {
if (verbose) {
List<SimulateProcessorResult> processorResultList = new CopyOnWriteArrayList<>();
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline, (result, e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,27 @@
public final class TrackingResultProcessor implements Processor {

private final Processor actualProcessor;
private final ConditionalProcessor conditionalProcessor;
private final List<SimulateProcessorResult> processorResultList;
private final boolean ignoreFailure;

TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List<SimulateProcessorResult> processorResultList) {
TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, ConditionalProcessor conditionalProcessor,
List<SimulateProcessorResult> processorResultList) {
this.ignoreFailure = ignoreFailure;
this.processorResultList = processorResultList;
this.actualProcessor = actualProcessor;
this.conditionalProcessor = conditionalProcessor;
}

@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
if (conditionalProcessor != null ) {
if (conditionalProcessor.evaluate(ingestDocument) == false) {
handler.accept(ingestDocument, null);
return;
}
}

if (actualProcessor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor);
Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument);
Expand All @@ -64,7 +74,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
}
} else {
//now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline, handler);
Expand All @@ -73,36 +83,20 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
return;
}

final Processor processor;
if (actualProcessor instanceof ConditionalProcessor) {
ConditionalProcessor conditionalProcessor = (ConditionalProcessor) actualProcessor;
if (conditionalProcessor.evaluate(ingestDocument) == false) {
handler.accept(ingestDocument, null);
return;
}
if (conditionalProcessor.getInnerProcessor() instanceof PipelineProcessor) {
processor = conditionalProcessor.getInnerProcessor();
} else {
processor = actualProcessor;
}
} else {
processor = actualProcessor;
}

processor.execute(ingestDocument, (result, e) -> {
actualProcessor.execute(ingestDocument, (result, e) -> {
if (e != null) {
if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e));
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument), e));
} else {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), e));
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e));
}
handler.accept(null, e);
} else {
if (result != null) {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument)));
handler.accept(result, null);
} else {
processorResultList.add(new SimulateProcessorResult(processor.getTag()));
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag()));
handler.accept(null, null);
}
}
Expand All @@ -124,21 +118,34 @@ public String getTag() {
return actualProcessor.getTag();
}

public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList) {
List<Processor> processors = new ArrayList<>(compoundProcessor.getProcessors().size());
public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, ConditionalProcessor parentCondition,
List<SimulateProcessorResult> processorResultList) {
List<Processor> processors = new ArrayList<>();
for (Processor processor : compoundProcessor.getProcessors()) {
ConditionalProcessor conditionalProcessor = parentCondition;
if (processor instanceof ConditionalProcessor) {
conditionalProcessor = (ConditionalProcessor) processor;
processor = conditionalProcessor.getInnerProcessor();
}
if (processor instanceof CompoundProcessor) {
processors.add(decorate((CompoundProcessor) processor, processorResultList));
processors.add(decorate((CompoundProcessor) processor, conditionalProcessor, processorResultList));
} else {
processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
processors.add(
new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, conditionalProcessor, processorResultList));
}
}
List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size());
for (Processor processor : compoundProcessor.getOnFailureProcessors()) {
ConditionalProcessor conditionalProcessor = null;
if (processor instanceof ConditionalProcessor) {
conditionalProcessor = (ConditionalProcessor) processor;
processor = conditionalProcessor.getInnerProcessor();
}
if (processor instanceof CompoundProcessor) {
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList));
onFailureProcessors.add(decorate((CompoundProcessor) processor, conditionalProcessor, processorResultList));
} else {
onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
onFailureProcessors.add(
new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, conditionalProcessor, processorResultList));
}
}
return new CompoundProcessor(compoundProcessor.isIgnoreFailure(), processors, onFailureProcessors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void init() {

public void testActualProcessor() throws Exception {
TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {});
TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList);
TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {});

SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
Expand All @@ -81,7 +81,7 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception {
RuntimeException exception = new RuntimeException("processor failed");
TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; });
CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);

Exception[] holder = new Exception[1];
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
Expand All @@ -104,7 +104,7 @@ public void testActualCompoundProcessorWithOnFailure() throws Exception {
Arrays.asList(failProcessor, onFailureProcessor),
Arrays.asList(onFailureProcessor, failProcessor))),
Arrays.asList(onFailureProcessor));
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {});

SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument);
Expand Down Expand Up @@ -137,12 +137,53 @@ public void testActualCompoundProcessorWithOnFailure() throws Exception {
assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
}

public void testActualCompoundProcessorWithOnFailureAndTrueCondition() throws Exception {
String scriptName = "conditionalScript";
ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG,
new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> true), Collections.emptyMap())),
new HashMap<>(ScriptModule.CORE_CONTEXTS)
);
RuntimeException exception = new RuntimeException("fail");
TestProcessor failProcessor = new TestProcessor("fail", "test", exception);
ConditionalProcessor conditionalProcessor = new ConditionalProcessor(
randomAlphaOfLength(10),
new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService,
failProcessor);
TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {});
CompoundProcessor actualProcessor =
new CompoundProcessor(false,
Arrays.asList(conditionalProcessor),
Arrays.asList(onFailureProcessor));
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {
});

SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument);
SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument);

assertThat(failProcessor.getInvokedCounter(), equalTo(1));
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));
assertThat(resultList.size(), equalTo(2));

assertThat(resultList.get(0).getIngestDocument(), nullValue());
assertThat(resultList.get(0).getFailure(), equalTo(exception));
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));

Map<String, Object> metadata = resultList.get(1).getIngestDocument().getIngestMetadata();
assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail"));
assertThat(resultList.get(1).getFailure(), nullValue());
assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
}


public void testActualCompoundProcessorWithIgnoreFailure() throws Exception {
RuntimeException exception = new RuntimeException("processor failed");
TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; });
CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor),
Collections.emptyList());
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);

trackingProcessor.execute(ingestDocument, (result, e) -> {});

Expand Down Expand Up @@ -173,7 +214,7 @@ public void testActualCompoundProcessorWithFalseConditional() throws Exception {
new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); })),
new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); }));

CompoundProcessor trackingProcessor = decorate(compoundProcessor, resultList);
CompoundProcessor trackingProcessor = decorate(compoundProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument);

Expand Down Expand Up @@ -215,7 +256,7 @@ pipelineId, null, null, new CompoundProcessor(
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);

CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);

trackingProcessor.execute(ingestDocument, (result, e) -> {});

Expand Down Expand Up @@ -282,7 +323,7 @@ pipelineId2, null, null, new CompoundProcessor(
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);

CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);

trackingProcessor.execute(ingestDocument, (result, e) -> {});

Expand Down Expand Up @@ -351,7 +392,7 @@ pipelineId2, null, null, new CompoundProcessor(
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);

CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);

trackingProcessor.execute(ingestDocument, (result, e) -> {});

Expand Down Expand Up @@ -404,7 +445,7 @@ pipelineId, null, null, new CompoundProcessor(
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);

CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);

trackingProcessor.execute(ingestDocument, (result, e) -> {});

Expand Down Expand Up @@ -455,7 +496,7 @@ public void testActualPipelineProcessorWithCycle() throws Exception {
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);

CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);

Exception[] holder = new Exception[1];
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
Expand All @@ -481,7 +522,7 @@ pipelineId, null, null, new CompoundProcessor(

CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor);

CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);

trackingProcessor.execute(ingestDocument, (result, e) -> {});

Expand All @@ -504,4 +545,6 @@ pipelineId, null, null, new CompoundProcessor(
resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1));
}



}

0 comments on commit a56fb61

Please sign in to comment.