Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow detectors to be stopped if underlying workflow is deleted. Don't allow them to then be started/edited (#810) #958

Merged
merged 1 commit into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,10 @@
this.alertsHistoryIndexPattern = alertsHistoryIndexPattern;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

Check warning on line 573 in src/main/java/org/opensearch/securityanalytics/model/Detector.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/model/Detector.java#L572-L573

Added lines #L572 - L573 were not covered by tests

public void setEnabledTime(Instant enabledTime) {
this.enabledTime = enabledTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.ExceptionChecker;
import org.opensearch.securityanalytics.util.MonitorService;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.securityanalytics.util.ThrowableCheckingPredicates;
import org.opensearch.securityanalytics.util.WorkflowService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -61,6 +63,11 @@
public class TransportDeleteDetectorAction extends HandledTransportAction<DeleteDetectorRequest, DeleteDetectorResponse> {

private static final Logger log = LogManager.getLogger(TransportDeleteDetectorAction.class);
private static final List<ThrowableCheckingPredicates> ACCEPTABLE_ENTITY_MISSING_THROWABLE_MATCHERS = List.of(

Check warning on line 66 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L66

Added line #L66 was not covered by tests
ThrowableCheckingPredicates.MONITOR_NOT_FOUND,
ThrowableCheckingPredicates.WORKFLOW_NOT_FOUND,
ThrowableCheckingPredicates.ALERTING_CONFIG_INDEX_NOT_FOUND
);

private final Client client;

Expand All @@ -83,9 +90,13 @@

private final DetectorIndices detectorIndices;

private final ExceptionChecker exceptionChecker;

@Inject
public TransportDeleteDetectorAction(TransportService transportService, IndexTemplateManager indexTemplateManager, Client client, ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices, DetectorIndices detectorIndices, ClusterService clusterService,
Settings settings) {
public TransportDeleteDetectorAction(TransportService transportService, IndexTemplateManager indexTemplateManager, Client client,
ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices,
DetectorIndices detectorIndices, ClusterService clusterService, Settings settings,
ExceptionChecker exceptionChecker) {
super(DeleteDetectorAction.NAME, transportService, actionFilters, DeleteDetectorRequest::new);
this.client = client;
this.ruleTopicIndices = ruleTopicIndices;
Expand All @@ -100,6 +111,7 @@

this.enabledWorkflowUsage = SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE.get(this.settings);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE, this::setEnabledWorkflowUsage);
this.exceptionChecker = exceptionChecker;

Check warning on line 114 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L114

Added line #L114 was not covered by tests
}

@Override
Expand Down Expand Up @@ -204,7 +216,8 @@

@Override
public void onFailure(Exception e) {
if (isOnlyWorkflowOrMonitorOrIndexMissingExceptionThrownByGroupedActionListener(e, detector.getId())) {
if (exceptionChecker.doesGroupedActionListenerExceptionMatch(e, ACCEPTABLE_ENTITY_MISSING_THROWABLE_MATCHERS)) {
logAcceptableEntityMissingException(e, detector.getId());

Check warning on line 220 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L220

Added line #L220 was not covered by tests
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy());
} else {
log.error(String.format(Locale.ROOT, "Failed to delete detector %s", detector.getId()), e);
Expand Down Expand Up @@ -243,7 +256,8 @@

private void handleDeleteWorkflowFailure(final String detectorId, final Exception deleteWorkflowException,
final ActionListener<AcknowledgedResponse> actionListener) {
if (isOnlyWorkflowOrMonitorOrIndexMissingExceptionThrownByGroupedActionListener(deleteWorkflowException, detectorId)) {
if (exceptionChecker.doesGroupedActionListenerExceptionMatch(deleteWorkflowException, ACCEPTABLE_ENTITY_MISSING_THROWABLE_MATCHERS)) {
logAcceptableEntityMissingException(deleteWorkflowException, detectorId);

Check warning on line 260 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L260

Added line #L260 was not covered by tests
actionListener.onResponse(new AcknowledgedResponse(true));
} else {
actionListener.onFailure(deleteWorkflowException);
Expand Down Expand Up @@ -305,39 +319,12 @@
}
}));
}

private boolean isOnlyWorkflowOrMonitorOrIndexMissingExceptionThrownByGroupedActionListener(
Exception ex,
String detectorId
) {
// grouped action listener listens on mutliple listeners but throws only one exception. If multiple
// listeners fail the other exceptions are added as suppressed exceptions to the first failure.
int len = ex.getSuppressed().length;
for (int i = 0; i <= len; i++) {
Throwable e = i == len ? ex : ex.getSuppressed()[i];
if (isMonitorNotFoundException(e) || isWorkflowNotFoundException(e) || isAlertingConfigIndexNotFoundException(e)) {
log.error(
String.format(Locale.ROOT, "Workflow, monitor, or jobs index already deleted." +
" Proceeding with detector %s deletion", detectorId),
e);
} else {
return false;
}
}
return true;
}
}

private boolean isMonitorNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Monitor(.*) is not found(.*)");
}

private boolean isWorkflowNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Workflow(.*) not found(.*)");
}

private boolean isAlertingConfigIndexNotFoundException(final Throwable e) {
return e.getMessage().contains("Configured indices are not found: [.opendistro-alerting-config]");
private void logAcceptableEntityMissingException(final Exception e, final String detectorId) {
final String errorMsg = String.format(Locale.ROOT, "Workflow, monitor, or jobs index already deleted." +

Check warning on line 325 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L325

Added line #L325 was not covered by tests
" Proceeding with detector %s deletion", detectorId);
log.error(errorMsg, e);

Check warning on line 327 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L327

Added line #L327 was not covered by tests
}

private void setEnabledWorkflowUsage(boolean enabledWorkflowUsage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,13 @@
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.DetectorUtils;
import org.opensearch.securityanalytics.util.ExceptionChecker;
import org.opensearch.securityanalytics.util.IndexUtils;
import org.opensearch.securityanalytics.util.MonitorService;
import org.opensearch.securityanalytics.util.RuleIndices;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.securityanalytics.util.ThrowableCheckingPredicates;
import org.opensearch.securityanalytics.util.WorkflowService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -155,6 +157,8 @@
private final MonitorService monitorService;
private final IndexNameExpressionResolver indexNameExpressionResolver;

private final ExceptionChecker exceptionChecker;

private final TimeValue indexTimeout;
@Inject
public TransportIndexDetectorAction(TransportService transportService,
Expand All @@ -169,7 +173,8 @@
Settings settings,
NamedWriteableRegistry namedWriteableRegistry,
LogTypeService logTypeService,
IndexNameExpressionResolver indexNameExpressionResolver) {
IndexNameExpressionResolver indexNameExpressionResolver,
ExceptionChecker exceptionChecker) {
super(IndexDetectorAction.NAME, transportService, actionFilters, IndexDetectorRequest::new);
this.client = client;
this.xContentRegistry = xContentRegistry;
Expand All @@ -191,6 +196,7 @@

this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.FILTER_BY_BACKEND_ROLES, this::setFilterByEnabled);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE, this::setEnabledWorkflowUsage);
this.exceptionChecker = exceptionChecker;

Check warning on line 199 in src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java#L199

Added line #L199 was not covered by tests
}

@Override
Expand Down Expand Up @@ -616,8 +622,7 @@
}
@Override
public void onFailure(Exception e) {
log.error("Failed to update the workflow");
listener.onFailure(e);
handleUpsertWorkflowFailure(e, listener, detector, monitorsToBeDeleted, refreshPolicy, updatedMonitors);

Check warning on line 625 in src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java#L625

Added line #L625 was not covered by tests
}
});
}
Expand Down Expand Up @@ -674,6 +679,25 @@
return new IndexMonitorRequest(monitorId, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, refreshPolicy, restMethod, monitor, null);
}

private void handleUpsertWorkflowFailure(final Exception e, final ActionListener<List<IndexMonitorResponse>> listener,
final Detector detector, final List<String> monitorsToBeDeleted,
final RefreshPolicy refreshPolicy, final List<IndexMonitorResponse> updatedMonitors) {
if (exceptionChecker.doesGroupedActionListenerExceptionMatch(e, List.of(ThrowableCheckingPredicates.WORKFLOW_NOT_FOUND))) {
if (detector.getEnabled()) {
final String errorMessage = String.format("Underlying workflow associated with detector %s not found. " +
"Delete and recreate the detector to restore functionality.", detector.getName());
log.error(errorMessage);
listener.onFailure(new SecurityAnalyticsException(errorMessage, RestStatus.BAD_REQUEST, e));
} else {
log.error("Underlying workflow associated with detector {} not found. Proceeding to disable detector.", detector.getName());
deleteMonitorStep(monitorsToBeDeleted, refreshPolicy, updatedMonitors, listener);

Check warning on line 693 in src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java#L687-L693

Added lines #L687 - L693 were not covered by tests
}
} else {
log.error("Failed to update the workflow");
listener.onFailure(e);

Check warning on line 697 in src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java#L696-L697

Added lines #L696 - L697 were not covered by tests
}
}

Check warning on line 699 in src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java#L699

Added line #L699 was not covered by tests

/**
* Creates doc level monitor which generates per document alerts for the findings of the bucket level delegate monitors in a workflow.
* This monitor has match all query applied to generate the alerts per each finding doc.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.util;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

public class ExceptionChecker {

public boolean doesGroupedActionListenerExceptionMatch(final Exception ex, final List<ThrowableCheckingPredicates> exceptionMatchers) {
// grouped action listener listens on multiple listeners but throws only one exception. If multiple
// listeners fail the other exceptions are added as suppressed exceptions to the first failure.
return Stream.concat(Arrays.stream(ex.getSuppressed()), Stream.of(ex))
.allMatch(throwable -> doesExceptionMatch(throwable, exceptionMatchers));
}

private boolean doesExceptionMatch(final Throwable throwable, final List<ThrowableCheckingPredicates> exceptionMatchers) {
return exceptionMatchers.stream()
.map(ThrowableCheckingPredicates::getMatcherPredicate)
.anyMatch(matcher -> matcher.test(throwable));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.util;

import java.util.function.Predicate;

public enum ThrowableCheckingPredicates {
MONITOR_NOT_FOUND(ThrowableCheckingPredicates::isMonitorNotFoundException),
WORKFLOW_NOT_FOUND(ThrowableCheckingPredicates::isWorkflowNotFoundException),
ALERTING_CONFIG_INDEX_NOT_FOUND(ThrowableCheckingPredicates::isAlertingConfigIndexNotFoundException);

private final Predicate<Throwable> matcherPredicate;
ThrowableCheckingPredicates(final Predicate<Throwable> matcherPredicate) {
this.matcherPredicate = matcherPredicate;
}

public Predicate<Throwable> getMatcherPredicate() {
return this.matcherPredicate;
}

private static boolean isMonitorNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Monitor(.*) is not found(.*)");
}

public static boolean isWorkflowNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Workflow(.*) not found(.*)");
}

public static boolean isAlertingConfigIndexNotFoundException(final Throwable e) {
return e.getMessage().contains("Configured indices are not found: [.opendistro-alerting-config]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,50 @@ public void testUpdateADetectorWithIndexNotExists() throws IOException {
}
}

public void testDisableEnableADetectorWithWorkflowNotExists() throws IOException {
final String index = createTestIndex(randomIndex(), windowsIndexMapping());

// Execute CreateMappingsAction to add alias mapping for index
final Request createMappingRequest = new Request("POST", SecurityAnalyticsPlugin.MAPPER_BASE_URI);
// both req params and req body are supported
createMappingRequest.setJsonEntity(
"{ \"index_name\":\"" + index + "\"," +
" \"rule_topic\":\"" + randomDetectorType() + "\", " +
" \"partial\":true" +
"}"
);

final Response createMappingResponse = client().performRequest(createMappingRequest);
assertEquals(HttpStatus.SC_OK, createMappingResponse.getStatusLine().getStatusCode());

final Detector detector = randomDetector(getRandomPrePackagedRules());
final Response createResponse = makeRequest(client(), "POST", SecurityAnalyticsPlugin.DETECTOR_BASE_URI, Collections.emptyMap(), toHttpEntity(detector));
Assert.assertEquals("Create detector failed", RestStatus.CREATED, restStatus(createResponse));

final Map<String, Object> createResponseAsMap = asMap(createResponse);
final String detectorId = createResponseAsMap.get("_id").toString();

final Map<String, Object> detectorSourceAsMap = getDetectorSourceAsMap(detectorId);
final String workflowId = ((List<String>) detectorSourceAsMap.get("workflow_ids")).get(0);

final Response deleteWorkflowResponse = deleteAlertingWorkflow(workflowId);
assertEquals(200, deleteWorkflowResponse.getStatusLine().getStatusCode());
entityAsMap(deleteWorkflowResponse);

detector.setEnabled(false);
Response updateResponse = makeRequest(client(), "PUT", SecurityAnalyticsPlugin.DETECTOR_BASE_URI + "/" + detectorId, Collections.emptyMap(), toHttpEntity(detector));
Assert.assertEquals(200, updateResponse.getStatusLine().getStatusCode());

try {
detector.setEnabled(true);
makeRequest(client(), "PUT", SecurityAnalyticsPlugin.DETECTOR_BASE_URI + "/" + detectorId, Collections.emptyMap(), toHttpEntity(detector));
} catch (ResponseException ex) {
Assert.assertEquals(400, ex.getResponse().getStatusLine().getStatusCode());
Assert.assertEquals(true, ex.getMessage().contains(String.format("Underlying workflow associated with detector %s not found. " +
"Delete and recreate the detector to restore functionality.", detector.getName())));
}
}

@SuppressWarnings("unchecked")
public void testDeletingADetector_single_ruleTopicIndex() throws IOException {
String index = createTestIndex(randomIndex(), windowsIndexMapping());
Expand Down
Loading
Loading