Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create sap error history index #859

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,15 @@
*/
package org.opensearch.securityanalytics.config.monitors;

import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.common.inject.Inject;
import org.opensearch.securityanalytics.logtype.LogTypeService;
import org.opensearch.securityanalytics.model.Detector;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.opensearch.securityanalytics.model.LogType;


public class DetectorMonitorConfig {

public static final String OPENSEARCH_SAP_RULE_INDEX_TEMPLATE = ".opensearch-sap-detectors-queries-index-template";
public static final String OPENSEARCH_SAP_ERROR_INDEX = ".opensearch-sap-error-history";

public static String getRuleIndex(String logType) {
return String.format(Locale.getDefault(), ".opensearch-sap-%s-detectors-queries", logType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,11 @@
import org.opensearch.securityanalytics.rules.exceptions.SigmaError;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.threatIntel.DetectorThreatIntelService;
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.DetectorUtils;
import org.opensearch.securityanalytics.util.IndexUtils;
import org.opensearch.securityanalytics.util.MonitorService;
import org.opensearch.securityanalytics.util.RuleIndices;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.securityanalytics.util.WorkflowService;
import org.opensearch.securityanalytics.util.*;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we should avoid wildcard imports

import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -118,7 +111,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -135,6 +127,8 @@ public class TransportIndexDetectorAction extends HandledTransportAction<IndexDe

private final DetectorIndices detectorIndices;

private final ErrorsHistoryIndex errorHistoryIndex;

private final RuleTopicIndices ruleTopicIndices;

private final RuleIndices ruleIndices;
Expand Down Expand Up @@ -196,7 +190,7 @@ public TransportIndexDetectorAction(TransportService transportService,
this.enabledWorkflowUsage = SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE.get(this.settings);
this.monitorService = new MonitorService(client);
this.workflowService = new WorkflowService(client, monitorService);

this.errorHistoryIndex = new ErrorsHistoryIndex(logTypeService, client, clusterService, threadPool);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.FILTER_BY_BACKEND_ROLES, this::setFilterByEnabled);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE, this::setEnabledWorkflowUsage);
}
Expand All @@ -210,7 +204,6 @@ protected void doExecute(Task task, IndexDetectorRequest request, ActionListener
listener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(validateBackendRoleMessage, RestStatus.FORBIDDEN)));
return;
}

checkIndicesAndExecute(task, request, listener, user);
}

Expand Down Expand Up @@ -245,6 +238,11 @@ public void onFailure(Exception e) {
else {
listener.onFailure(SecurityAnalyticsException.wrap(e));
}
try {
initErrorHistoryIndexAndAddErrors(request, e.toString());
} catch (IOException ex) {
listener.onFailure(SecurityAnalyticsException.wrap(ex));
}
}
});
}
Expand Down Expand Up @@ -1087,7 +1085,6 @@ void createDetector() {
log.debug("user from original context is {}", originalContextUser);
request.getDetector().setUser(originalContextUser);


if (!detector.getInputs().isEmpty()) {
try {
ruleTopicIndices.initRuleTopicIndexTemplate(new ActionListener<>() {
Expand Down Expand Up @@ -1522,9 +1519,15 @@ private void onOperation(IndexResponse response, Detector detector) {
}

private void onFailures(Exception t) {
log.info("Exception failures while creating detector: {}", t.toString());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's log this as error level. Same comment for 1529. We can then avoid the t.toString() with just

log.error("Exception failures while creating detector", t);

if (counter.compareAndSet(false, true)) {
finishHim(null, t);
}
try {
initErrorHistoryIndexAndAddErrors(request, t.toString());
} catch (IOException e) {
log.info("Create SAP error index exception", e);
}
}

private void finishHim(Detector detector, Exception t) {
Expand Down Expand Up @@ -1573,6 +1576,50 @@ private Map<String, String> mapMonitorIds(List<IndexMonitorResponse> monitorResp
}
}

private void initErrorHistoryIndexAndAddErrors(IndexDetectorRequest request, String exceptionMessage) throws IOException {

if (!errorHistoryIndex.errorHistoryIndexExists()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take it or leave it - negating conditions is a bit harder to read than testing for the non-negated condition first. In a case like this where the method is an 'either or', I find it's easiest to read as

private void method() {
  if (condition) {
    // logic
    return;
  }

  // logic for !condition
}

errorHistoryIndex.initErrorsHistoryIndex(new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse response) {
errorHistoryIndex.onCreateMappingsResponse(response);
try {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: the pattern of nesting these action listeners is tough to read imo. This is far from the most flagrant use of it in the codebase but I would still advocate for a private method to handle the try/catch and inner logic. In this case it would also reduce duplication with the else condition below

errorHistoryIndex.addErrorsToSAPHistoryIndex(request, exceptionMessage, indexTimeout, new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
log.info("Successfully updated error history index");
}
@Override
public void onFailure(Exception ex) {
log.info("Exception while inserting a document in error history index: {}", ex);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

info -> error

}

});
} catch (IOException e) {
log.info("Exception while inserting a document in error history index: {}", e);
}
}
@Override
public void onFailure(Exception e) {
log.debug("Exception while creating error history index: {}", e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug -> error

}
});
} else {
errorHistoryIndex.addErrorsToSAPHistoryIndex(request, exceptionMessage, indexTimeout, new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
log.info("Successfully updated error history index");
}
@Override
public void onFailure(Exception ex) {
log.debug("Exception while creating error history index: {}", ex);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug -> error

}

});
}
}


private void setFilterByEnabled(boolean filterByEnabled) {
this.filterByEnabled = filterByEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.util;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.securityanalytics.action.IndexDetectorRequest;
import org.opensearch.securityanalytics.config.monitors.DetectorMonitorConfig;
import org.opensearch.securityanalytics.logtype.LogTypeService;
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.Objects;
import java.util.Locale;

public class ErrorsHistoryIndex {

private static final Logger log = LogManager.getLogger(ErrorsHistoryIndex.class);

private final Client client;

private final ClusterService clusterService;

private final ThreadPool threadPool;

private final LogTypeService logTypeService;

public ErrorsHistoryIndex(LogTypeService logTypeService, Client client, ClusterService clusterService, ThreadPool threadPool) {
this.client = client;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.logTypeService = logTypeService;
}

public static String errorsHistoryIndexMappings() throws IOException {
return new String(Objects.requireNonNull(RuleIndices.class.getClassLoader().getResourceAsStream("mappings/errorsHistory.json")).readAllBytes(), Charset.defaultCharset());
}
public void initErrorsHistoryIndex(ActionListener<CreateIndexResponse> actionListener) throws IOException {
Settings errorHistoryIndexSettings = Settings.builder()
.put("index.hidden", true)
.build();
CreateIndexRequest indexRequest = new CreateIndexRequest(DetectorMonitorConfig.OPENSEARCH_SAP_ERROR_INDEX)
.mapping(errorsHistoryIndexMappings())
.settings(errorHistoryIndexSettings);
client.admin().indices().create(indexRequest, actionListener);
}

public void addErrorsToSAPHistoryIndex(IndexDetectorRequest request, String exception, TimeValue indexTimeout, ActionListener<IndexResponse> actionListener) throws IOException {
Detector detector = request.getDetector();
String ruleTopic = detector.getDetectorType();
String indexName = DetectorMonitorConfig.OPENSEARCH_SAP_ERROR_INDEX;
Instant timestamp = detector.getLastUpdateTime();
String detectorId = detector.getId();
String operation = detectorId.isEmpty() ? "CREATE_DETECTOR" : "UPDATE_DETECTOR";
String user = detector.getUser() == null ? "user" : detector.getUser().getName();

XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("detectorId", detectorId);
builder.field("exception", exception);
builder.field("timestamp", timestamp);
builder.field("logType", ruleTopic);
builder.field("operation", operation);
builder.field("user", user);
builder.endObject();
IndexRequest indexRequest = new IndexRequest(indexName)
.id(UUIDs.base64UUID())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we let OpenSearch generate the ID rather than creating a UUID?

.source(builder)
.timeout(indexTimeout)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(indexRequest, actionListener);
}
public void onCreateMappingsResponse(CreateIndexResponse response) {
if (response.isAcknowledged()) {
log.info(String.format(Locale.getDefault(), "Created %s with mappings.", DetectorMonitorConfig.OPENSEARCH_SAP_ERROR_INDEX));
} else {
log.error(String.format(Locale.getDefault(), "Create %s mappings call not acknowledged.", DetectorMonitorConfig.OPENSEARCH_SAP_ERROR_INDEX));
throw new OpenSearchStatusException(String.format(Locale.getDefault(), "Create %s mappings call not acknowledged", Detector.DETECTORS_INDEX), RestStatus.INTERNAL_SERVER_ERROR);
}
}

public boolean errorHistoryIndexExists() {
ClusterState clusterState = clusterService.state();
return clusterState.getRoutingTable().hasIndex(DetectorMonitorConfig.OPENSEARCH_SAP_ERROR_INDEX);
}
}
26 changes: 26 additions & 0 deletions src/main/resources/mappings/errorsHistory.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"dynamic": "strict",
"_meta" : {
"schema_version": 1
},
"properties": {
"detectorId": {
"type": "keyword"
},
"exception": {
"type": "text"
},
"timestamp": {
"type": "text"
},
"operation": {
"type": "keyword"
},
"logType": {
"type": "keyword"
},
"user": {
"type": "text"
}
}
}
Loading