From 75c442902f2715e4b41f96e77230ac773d0eb2d2 Mon Sep 17 00:00:00 2001 From: Megha Goyal <56077967+goyamegh@users.noreply.github.com> Date: Fri, 8 Mar 2024 14:32:32 -0800 Subject: [PATCH] Add search request timeouts for correlations workflows (#893) * Reinstating more leaks plugged-in for correlations workflows Signed-off-by: Megha Goyal * Add search timeouts to all correlation searches Signed-off-by: Megha Goyal * Fix logging and exception messages Signed-off-by: Megha Goyal * Change search timeout to 30 seconds Signed-off-by: Megha Goyal --------- Signed-off-by: Megha Goyal --- .../correlation/JoinEngine.java | 7 ++ .../correlation/VectorEmbeddingsEngine.java | 21 ++++- .../TransportCorrelateFindingAction.java | 89 +++++++++---------- 3 files changed, 69 insertions(+), 48 deletions(-) diff --git a/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java b/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java index b33c4d43b..3b4314e12 100644 --- a/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java +++ b/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java @@ -10,6 +10,7 @@ import org.apache.lucene.search.join.ScoreMode; import org.opensearch.OpenSearchStatusException; import org.opensearch.cluster.routing.Preference; +import org.opensearch.common.unit.TimeValue; import org.opensearch.commons.alerting.model.DocLevelQuery; import org.opensearch.core.action.ActionListener; import org.opensearch.action.search.MultiSearchRequest; @@ -132,6 +133,7 @@ private void generateAutoCorrelations(Detector detector, Finding finding) throws searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(logTypeName)); searchRequest.source(sourceBuilder); searchRequest.preference(Preference.PRIMARY_FIRST.type()); + searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L)); mSearchRequest.add(searchRequest); } @@ -214,6 +216,7 @@ private void onAutoCorrelations(Detector detector, Finding finding, Map { if (response.isTimedOut()) { @@ -277,6 +280,7 @@ private void getValidDocuments(String detectorType, List indices, List searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(relatedDocIds.getKey())); searchRequest.source(searchSourceBuilder); searchRequest.preference(Preference.PRIMARY_FIRST.type()); + searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L)); categories.add(relatedDocIds.getKey()); mSearchRequest.add(searchRequest); diff --git a/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java b/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java index 86fc70bbd..78f7dc765 100644 --- a/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java +++ b/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java @@ -32,6 +32,7 @@ import org.opensearch.securityanalytics.transport.TransportCorrelateFindingAction; import org.opensearch.securityanalytics.util.CorrelationIndices; +import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; @@ -94,6 +95,7 @@ public void insertCorrelatedFindings(String detectorType, Finding finding, Strin request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP); request.source(searchSourceBuilder); request.preference(Preference.PRIMARY_FIRST.type()); + request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L)); mSearchRequest.add(request); } @@ -195,6 +197,12 @@ public void insertCorrelatedFindings(String detectorType, Finding finding, Strin } public void insertOrphanFindings(String detectorType, Finding finding, float timestampFeature, Map logTypes) { + if (logTypes.get(detectorType) == null ) { + log.debug("Missing detector type {} in the log types index for finding id {}. Keys in the index: {}", + detectorType, finding.getId(), Arrays.toString(logTypes.keySet().toArray())); + onFailure(new OpenSearchStatusException("insertOrphanFindings null log types for detector type: " + detectorType, RestStatus.INTERNAL_SERVER_ERROR)); + } + SearchRequest searchRequest = getSearchMetadataIndexRequest(detectorType, finding, logTypes); Map tags = logTypes.get(detectorType).getTags(); String correlationId = tags.get("correlation_id").toString(); @@ -251,7 +259,8 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim onFailure(ex); } } else { - onFailure(new OpenSearchStatusException(indexResponse.toString(), RestStatus.INTERNAL_SERVER_ERROR)); + onFailure(new OpenSearchStatusException("Indexing failed with response {} ", + indexResponse.status(), indexResponse.toString())); } }, this::onFailure)); } else { @@ -297,7 +306,8 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim onFailure(ex); } } else { - onFailure(new OpenSearchStatusException(indexResponse.toString(), RestStatus.INTERNAL_SERVER_ERROR)); + onFailure(new OpenSearchStatusException("Indexing failed with response {} ", + indexResponse.status(), indexResponse.toString())); } }, this::onFailure)); } else { @@ -323,6 +333,7 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP); request.source(searchSourceBuilder); request.preference(Preference.PRIMARY_FIRST.type()); + request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L)); client.search(request, ActionListener.wrap(searchResponse -> { if (searchResponse.isTimedOut()) { @@ -407,6 +418,9 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim } catch (Exception ex) { onFailure(ex); } + } else { + onFailure(new OpenSearchStatusException("Indexing failed with response {} ", + indexResponse.status(), indexResponse.toString())); } }, this::onFailure)); } catch (Exception ex) { @@ -432,7 +446,7 @@ private void indexCorrelatedFindings(XContentBuilder builder) { if (response.status().equals(RestStatus.CREATED)) { correlateFindingAction.onOperation(); } else { - onFailure(new OpenSearchStatusException(response.toString(), RestStatus.INTERNAL_SERVER_ERROR)); + onFailure(new OpenSearchStatusException("Indexing failed with response {} ", response.status(), response.toString())); } }, this::onFailure)); } @@ -454,6 +468,7 @@ private SearchRequest getSearchMetadataIndexRequest(String detectorType, Finding searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX); searchRequest.source(searchSourceBuilder); searchRequest.preference(Preference.PRIMARY_FIRST.type()); + searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L)); return searchRequest; } diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java index d5e0eed32..910794556 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java @@ -172,13 +172,13 @@ protected void doExecute(Task task, ActionRequest request, ActionListener { if (response.isTimedOut()) { @@ -245,8 +246,8 @@ void start() { ); Detector detector = Detector.docParse(xcp, hit.getId(), hit.getVersion()); joinEngine.onSearchDetectorResponse(detector, finding); - } catch (IOException e) { - log.error("IOException for request {}", searchRequest.toString(), e); + } catch (Exception e) { + log.error("Exception for request {}", searchRequest.toString(), e); onFailures(e); } } else { @@ -277,7 +278,7 @@ public void initCorrelationIndex(String detectorType, Map> } else { getTimestampFeature(detectorType, correlatedFindings, null, correlationRules); } - } catch (IOException ex) { + } catch (Exception ex) { onFailures(ex); } } @@ -353,7 +354,8 @@ public void getTimestampFeature(String detectorType, Map> c }, this::onFailures)); }, this::onFailures)); } else { - log.error(new OpenSearchStatusException("Failed to create correlation metadata Index", RestStatus.INTERNAL_SERVER_ERROR)); + Exception e = new OpenSearchStatusException("Failed to create correlation metadata Index", RestStatus.INTERNAL_SERVER_ERROR); + onFailures(e); } }, this::onFailures)); } else { @@ -364,54 +366,49 @@ public void getTimestampFeature(String detectorType, Map> c if (response.getHits().getHits().length == 0) { onFailures(new ResourceNotFoundException( "Failed to find hits in metadata index for finding id {}", request.getFinding().getId())); - } - - String id = response.getHits().getHits()[0].getId(); - Map hitSource = response.getHits().getHits()[0].getSourceAsMap(); - long scoreTimestamp = (long) hitSource.get("scoreTimestamp"); + } else { + String id = response.getHits().getHits()[0].getId(); + Map hitSource = response.getHits().getHits()[0].getSourceAsMap(); + long scoreTimestamp = (long) hitSource.get("scoreTimestamp"); - long newScoreTimestamp = findingTimestamp - CorrelationIndices.FIXED_HISTORICAL_INTERVAL; - if (newScoreTimestamp > scoreTimestamp) { - try { + long newScoreTimestamp = findingTimestamp - CorrelationIndices.FIXED_HISTORICAL_INTERVAL; + if (newScoreTimestamp > scoreTimestamp) { IndexRequest scoreIndexRequest = getCorrelationMetadataIndexRequest(id, newScoreTimestamp); client.index(scoreIndexRequest, ActionListener.wrap(indexResponse -> { - SearchRequest searchRequest = getSearchLogTypeIndexRequest(); + SearchRequest searchRequest = getSearchLogTypeIndexRequest(); client.search(searchRequest, ActionListener.wrap(searchResponse -> { - if (searchResponse.isTimedOut()) { - onFailures(new OpenSearchStatusException("Search request timed out", RestStatus.REQUEST_TIMEOUT)); - } + if (searchResponse.isTimedOut()) { + onFailures(new OpenSearchStatusException("Search request timed out", RestStatus.REQUEST_TIMEOUT)); + } - SearchHit[] hits = searchResponse.getHits().getHits(); - Map logTypes = new HashMap<>(); - for (SearchHit hit : hits) { - Map sourceMap = hit.getSourceAsMap(); - logTypes.put(sourceMap.get("name").toString(), - new CustomLogType(sourceMap)); - } + SearchHit[] hits = searchResponse.getHits().getHits(); + Map logTypes = new HashMap<>(); + for (SearchHit hit : hits) { + Map sourceMap = hit.getSourceAsMap(); + logTypes.put(sourceMap.get("name").toString(), new CustomLogType(sourceMap)); + } - if (correlatedFindings != null) { - if (correlatedFindings.isEmpty()) { - vectorEmbeddingsEngine.insertOrphanFindings(detectorType, request.getFinding(), Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes); - } - for (Map.Entry> correlatedFinding : correlatedFindings.entrySet()) { - vectorEmbeddingsEngine.insertCorrelatedFindings(detectorType, request.getFinding(), correlatedFinding.getKey(), correlatedFinding.getValue(), - Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), correlationRules, logTypes); - } - } else { - vectorEmbeddingsEngine.insertOrphanFindings(detectorType, orphanFinding, Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes); + if (correlatedFindings != null) { + if (correlatedFindings.isEmpty()) { + vectorEmbeddingsEngine.insertOrphanFindings(detectorType, request.getFinding(), Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes); } - }, this::onFailures)); + for (Map.Entry> correlatedFinding : correlatedFindings.entrySet()) { + vectorEmbeddingsEngine.insertCorrelatedFindings(detectorType, request.getFinding(), correlatedFinding.getKey(), correlatedFinding.getValue(), + Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), correlationRules, logTypes); + } + } else { + vectorEmbeddingsEngine.insertOrphanFindings(detectorType, orphanFinding, Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes); + } + }, this::onFailures)); }, this::onFailures)); - } catch (Exception ex) { - onFailures(ex); - } - } else { - float timestampFeature = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue(); + } else { + float timestampFeature = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue(); - SearchRequest searchRequest = getSearchLogTypeIndexRequest(); - insertFindings(timestampFeature, searchRequest, correlatedFindings, detectorType, correlationRules, orphanFinding); + SearchRequest searchRequest = getSearchLogTypeIndexRequest(); + insertFindings(timestampFeature, searchRequest, correlatedFindings, detectorType, correlationRules, orphanFinding); + } } }, this::onFailures)); } @@ -430,6 +427,7 @@ private SearchRequest getSearchLogTypeIndexRequest() { SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(LogTypeService.LOG_TYPE_INDEX); searchRequest.source(searchSourceBuilder); + searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L)); return searchRequest; } @@ -439,13 +437,13 @@ private IndexRequest getCorrelationMetadataIndexRequest(String id, long newScore scoreBuilder.field("root", false); scoreBuilder.endObject(); - IndexRequest scoreIndexRequest = new IndexRequest(CorrelationIndices.CORRELATION_METADATA_INDEX) + return new IndexRequest(CorrelationIndices.CORRELATION_METADATA_INDEX) .id(id) .source(scoreBuilder) .timeout(indexTimeout) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - return scoreIndexRequest; } + private void insertFindings(float timestampFeature, SearchRequest searchRequest, Map> correlatedFindings, String detectorType, List correlationRules, Finding orphanFinding) { client.search(searchRequest, ActionListener.wrap(response -> { if (response.isTimedOut()) { @@ -485,6 +483,7 @@ private SearchRequest getSearchMetadataIndexRequest() { searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX); searchRequest.source(searchSourceBuilder); searchRequest.preference(Preference.PRIMARY_FIRST.type()); + searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L)); return searchRequest; }