Skip to content

Commit

Permalink
create sap error history index
Browse files Browse the repository at this point in the history
Signed-off-by: Riya Saxena <[email protected]>
  • Loading branch information
riysaxen-amzn committed Feb 17, 2024
1 parent e3362f6 commit 844c2f9
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,15 @@
*/
package org.opensearch.securityanalytics.config.monitors;

import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.common.inject.Inject;
import org.opensearch.securityanalytics.logtype.LogTypeService;
import org.opensearch.securityanalytics.model.Detector;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.opensearch.securityanalytics.model.LogType;


public class DetectorMonitorConfig {

public static final String OPENSEARCH_SAP_RULE_INDEX_TEMPLATE = ".opensearch-sap-detectors-queries-index-template";
public static final String OPENSEARCH_SAP_ERROR_INDEX = ".opensearch-sap-error-history";

public static String getRuleIndex(String logType) {
return String.format(Locale.getDefault(), ".opensearch-sap-%s-detectors-queries", logType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,11 @@
import org.opensearch.securityanalytics.rules.exceptions.SigmaError;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.threatIntel.DetectorThreatIntelService;
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.DetectorUtils;
import org.opensearch.securityanalytics.util.IndexUtils;
import org.opensearch.securityanalytics.util.MonitorService;
import org.opensearch.securityanalytics.util.RuleIndices;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.securityanalytics.util.WorkflowService;
import org.opensearch.securityanalytics.util.*;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -118,7 +111,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -135,6 +127,8 @@ public class TransportIndexDetectorAction extends HandledTransportAction<IndexDe

private final DetectorIndices detectorIndices;

private final ErrorsHistoryIndex errorHistoryIndex;

private final RuleTopicIndices ruleTopicIndices;

private final RuleIndices ruleIndices;
Expand Down Expand Up @@ -196,7 +190,7 @@ public TransportIndexDetectorAction(TransportService transportService,
this.enabledWorkflowUsage = SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE.get(this.settings);
this.monitorService = new MonitorService(client);
this.workflowService = new WorkflowService(client, monitorService);

this.errorHistoryIndex = new ErrorsHistoryIndex(logTypeService, client, clusterService, threadPool);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.FILTER_BY_BACKEND_ROLES, this::setFilterByEnabled);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE, this::setEnabledWorkflowUsage);
}
Expand All @@ -210,7 +204,6 @@ protected void doExecute(Task task, IndexDetectorRequest request, ActionListener
listener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(validateBackendRoleMessage, RestStatus.FORBIDDEN)));
return;
}

checkIndicesAndExecute(task, request, listener, user);
}

Expand Down Expand Up @@ -245,6 +238,11 @@ public void onFailure(Exception e) {
else {
listener.onFailure(SecurityAnalyticsException.wrap(e));
}
try {
initErrorHistoryIndexAndAddErrors(request, e.toString());
} catch (IOException ex) {
listener.onFailure(SecurityAnalyticsException.wrap(ex));
}
}
});
}
Expand Down Expand Up @@ -1087,7 +1085,6 @@ void createDetector() {
log.debug("user from original context is {}", originalContextUser);
request.getDetector().setUser(originalContextUser);


if (!detector.getInputs().isEmpty()) {
try {
ruleTopicIndices.initRuleTopicIndexTemplate(new ActionListener<>() {
Expand Down Expand Up @@ -1522,9 +1519,15 @@ private void onOperation(IndexResponse response, Detector detector) {
}

private void onFailures(Exception t) {
log.info("Exception failures while creating detector: {}", t.toString());
if (counter.compareAndSet(false, true)) {
finishHim(null, t);
}
try {
initErrorHistoryIndexAndAddErrors(request, t.toString());
} catch (IOException e) {
log.info("Create SAP error index exception", e);
}
}

private void finishHim(Detector detector, Exception t) {
Expand Down Expand Up @@ -1573,6 +1576,50 @@ private Map<String, String> mapMonitorIds(List<IndexMonitorResponse> monitorResp
}
}

private void initErrorHistoryIndexAndAddErrors(IndexDetectorRequest request, String exceptionMessage) throws IOException {

if (!errorHistoryIndex.errorHistoryIndexExists()) {
errorHistoryIndex.initErrorsHistoryIndex(new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse response) {
errorHistoryIndex.onCreateMappingsResponse(response);
try {
errorHistoryIndex.addErrorsToSAPHistoryIndex(request, exceptionMessage, indexTimeout, new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
log.info("Successfully updated error history index");
}
@Override
public void onFailure(Exception ex) {
log.info("Exception while inserting a document in error history index: {}", ex);
}

});
} catch (IOException e) {
log.info("Exception while inserting a document in error history index: {}", e);
}
}
@Override
public void onFailure(Exception e) {
log.debug("Exception while creating error history index: {}", e);
}
});
} else {
errorHistoryIndex.addErrorsToSAPHistoryIndex(request, exceptionMessage, indexTimeout, new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
log.info("Successfully updated error history index");
}
@Override
public void onFailure(Exception ex) {
log.debug("Exception while creating error history index: {}", ex);
}

});
}
}


private void setFilterByEnabled(boolean filterByEnabled) {
this.filterByEnabled = filterByEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.util;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.securityanalytics.action.IndexDetectorRequest;
import org.opensearch.securityanalytics.config.monitors.DetectorMonitorConfig;
import org.opensearch.securityanalytics.logtype.LogTypeService;
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.Objects;
import java.util.Locale;

public class ErrorsHistoryIndex {

private static final Logger log = LogManager.getLogger(ErrorsHistoryIndex.class);

private final Client client;

private final ClusterService clusterService;

private final ThreadPool threadPool;

private final LogTypeService logTypeService;

public ErrorsHistoryIndex(LogTypeService logTypeService, Client client, ClusterService clusterService, ThreadPool threadPool) {
this.client = client;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.logTypeService = logTypeService;
}

public static String errorsHistoryIndexMappings() throws IOException {
return new String(Objects.requireNonNull(RuleIndices.class.getClassLoader().getResourceAsStream("mappings/errorsHistory.json")).readAllBytes(), Charset.defaultCharset());
}
public void initErrorsHistoryIndex(ActionListener<CreateIndexResponse> actionListener) throws IOException {
Settings errorHistoryIndexSettings = Settings.builder()
.put("index.hidden", true)
.build();
CreateIndexRequest indexRequest = new CreateIndexRequest(DetectorMonitorConfig.OPENSEARCH_SAP_ERROR_INDEX)
.mapping(errorsHistoryIndexMappings())
.settings(errorHistoryIndexSettings);
client.admin().indices().create(indexRequest, actionListener);
}

public void addErrorsToSAPHistoryIndex(IndexDetectorRequest request, String exception, TimeValue indexTimeout, ActionListener<IndexResponse> actionListener) throws IOException {
Detector detector = request.getDetector();
String ruleTopic = detector.getDetectorType();
String indexName = DetectorMonitorConfig.OPENSEARCH_SAP_ERROR_INDEX;
Instant timestamp = detector.getLastUpdateTime();
String detectorId = detector.getId();
String operation = detectorId.isEmpty() ? "CREATE_DETECTOR" : "UPDATE_DETECTOR";
String user = detector.getUser() == null ? "user" : detector.getUser().getName();

XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("detectorId", detectorId);
builder.field("exception", exception);
builder.field("timestamp", timestamp);
builder.field("logType", ruleTopic);
builder.field("operation", operation);
builder.field("user", user);
builder.endObject();
IndexRequest indexRequest = new IndexRequest(indexName)
.id(UUIDs.base64UUID())
.source(builder)
.timeout(indexTimeout)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(indexRequest, actionListener);
}
public void onCreateMappingsResponse(CreateIndexResponse response) {
if (response.isAcknowledged()) {
log.info(String.format(Locale.getDefault(), "Created %s with mappings.", DetectorMonitorConfig.OPENSEARCH_SAP_ERROR_INDEX));
} else {
log.error(String.format(Locale.getDefault(), "Create %s mappings call not acknowledged.", DetectorMonitorConfig.OPENSEARCH_SAP_ERROR_INDEX));
throw new OpenSearchStatusException(String.format(Locale.getDefault(), "Create %s mappings call not acknowledged", Detector.DETECTORS_INDEX), RestStatus.INTERNAL_SERVER_ERROR);
}
}

public boolean errorHistoryIndexExists() {
ClusterState clusterState = clusterService.state();
return clusterState.getRoutingTable().hasIndex(DetectorMonitorConfig.OPENSEARCH_SAP_ERROR_INDEX);
}
}
26 changes: 26 additions & 0 deletions src/main/resources/mappings/errorsHistory.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"dynamic": "strict",
"_meta" : {
"schema_version": 1
},
"properties": {
"detectorId": {
"type": "keyword"
},
"exception": {
"type": "text"
},
"timestamp": {
"type": "text"
},
"operation": {
"type": "keyword"
},
"logType": {
"type": "keyword"
},
"user": {
"type": "text"
}
}
}
Loading

0 comments on commit 844c2f9

Please sign in to comment.