Skip to content

Commit

Permalink
Resolved the comments
Browse files Browse the repository at this point in the history
Signed-off-by: Rai <[email protected]>
  • Loading branch information
Rai committed Aug 28, 2024
1 parent 8a39f44 commit 99eaf02
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ void executeDocument(
}

public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {

threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> {
final AtomicInteger counter = new AtomicInteger();
final List<SimulateDocumentResult> responses = new CopyOnWriteArrayList<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.ingest.IngestPipelineValidator;
import org.opensearch.ingest.IngestService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -89,7 +88,7 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe
return;
}

IngestPipelineValidator.validateIngestPipeline(simulateRequest.getPipeline(), ingestService.getClusterService());
ingestService.validateProcessorCountForIngestPipeline(simulateRequest.getPipeline());

executionService.execute(simulateRequest, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.ingest.IngestPipelineValidator;
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.fs.FsHealthService;
import org.opensearch.monitor.fs.FsService;
import org.opensearch.monitor.jvm.JvmGcMonitorService;
Expand Down Expand Up @@ -406,7 +406,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ClusterService.USER_DEFINED_METADATA,
ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated
ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS,
IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS,
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
Expand Down

This file was deleted.

37 changes: 35 additions & 2 deletions server/src/main/java/org/opensearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.metrics.OperationMetrics;
import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -107,6 +108,18 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge

public static final String INGEST_ORIGIN = "ingest";

/**
* Defines the limit for the number of processors which can run on a given document during ingestion.
*/
public static final Setting<Integer> MAX_NUMBER_OF_INGEST_PROCESSORS = Setting.intSetting(
"cluster.ingest.max_number_processors",
Integer.MAX_VALUE,
1,
Integer.MAX_VALUE,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private static final Logger logger = LogManager.getLogger(IngestService.class);

private final ClusterService clusterService;
Expand All @@ -123,6 +136,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
private final ClusterManagerTaskThrottler.ThrottlingKey putPipelineTaskKey;
private final ClusterManagerTaskThrottler.ThrottlingKey deletePipelineTaskKey;
private volatile ClusterState state;
private volatile int maxIngestProcessorCount;

public IngestService(
ClusterService clusterService,
Expand Down Expand Up @@ -156,6 +170,12 @@ public IngestService(
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_PIPELINE_KEY, true);
deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_PIPELINE_KEY, true);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_NUMBER_OF_INGEST_PROCESSORS, this::setMaxIngestProcessorCount);
setMaxIngestProcessorCount(clusterService.getClusterSettings().get(MAX_NUMBER_OF_INGEST_PROCESSORS));
}

private void setMaxIngestProcessorCount(Integer maxIngestProcessorCount) {
this.maxIngestProcessorCount = maxIngestProcessorCount;
}

private static Map<String, Processor.Factory> processorFactories(List<IngestPlugin> ingestPlugins, Processor.Parameters parameters) {
Expand Down Expand Up @@ -495,7 +515,7 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getMediaType()).v2();
Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService);

IngestPipelineValidator.validateIngestPipeline(pipeline, clusterService);
validateProcessorCountForIngestPipeline(pipeline);

List<Exception> exceptions = new ArrayList<>();
for (Processor processor : pipeline.flattenAllProcessors()) {
Expand All @@ -510,6 +530,20 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq
ExceptionsHelper.rethrowAndSuppress(exceptions);
}

public void validateProcessorCountForIngestPipeline(Pipeline pipeline) {
List<Processor> processors = pipeline.getCompoundProcessor().getProcessors();

if (processors.size() > maxIngestProcessorCount) {
throw new IllegalStateException(
"Cannot use more than the maximum processors allowed. Number of processors being configured is ["
+ processors.size()
+ "] which exceeds the maximum allowed configuration of ["
+ maxIngestProcessorCount
+ "] processors."
);
}
}

public void executeBulkRequest(
int numberOfActionRequests,
Iterable<DocWriteRequest<?>> actionRequests,
Expand Down Expand Up @@ -1102,7 +1136,6 @@ void innerUpdatePipelines(IngestMetadata newIngestMetadata) {
processorFactories,
scriptService
);
IngestPipelineValidator.validateIngestPipeline(newPipeline, clusterService);
newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline));

if (previous == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.ingest.IngestPipelineValidator;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.junit.After;
Expand Down Expand Up @@ -38,16 +37,4 @@ public void testDeprecatedGetMasterServiceBWC() {
assertThat(masterService, equalTo(clusterManagerService));
}
}

public void testUpdateMaxIngestProcessorCountSetting() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);

// verify defaults
assertEquals(Integer.MAX_VALUE, clusterSettings.get(IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue());

// verify update max processor
Settings newSettings = Settings.builder().put("cluster.ingest.max_number_processors", 3).build();
clusterSettings.applySettings(newSettings);
assertEquals(3, clusterSettings.get(IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue());
}
}
13 changes: 13 additions & 0 deletions server/src/test/java/org/opensearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SetOnce;
import org.opensearch.common.metrics.OperationStats;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.xcontent.XContentType;
Expand Down Expand Up @@ -2058,6 +2059,18 @@ public void testPrepareBatches_different_index_pipeline() {
assertEquals(4, batches.size());
}

public void testUpdateMaxIngestProcessorCountSetting() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);

// verify defaults
assertEquals(Integer.MAX_VALUE, clusterSettings.get(IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue());

// verify update max processor
Settings newSettings = Settings.builder().put("cluster.ingest.max_number_processors", 3).build();
clusterSettings.applySettings(newSettings);
assertEquals(3, clusterSettings.get(IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue());
}

private IngestService.IndexRequestWrapper createIndexRequestWrapper(String index, List<String> pipelines) {
IndexRequest indexRequest = new IndexRequest(index);
return new IngestService.IndexRequestWrapper(0, indexRequest, pipelines, true);
Expand Down

0 comments on commit 99eaf02

Please sign in to comment.