Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/extensions] Migrates Start Detector #837

Merged

Conversation

joshpalis
Copy link
Member

@joshpalis joshpalis commented Mar 7, 2023

Description

Migrates the AnomalyDetectorJobTransportAction, RestAnomalyDetectorJobAction, and other supporting classes from plugin to extension.

This PR implements the following :

  1. AnomalyDetectorJobTransportAction/RestAnomalyDetectorJobAction - which is used for both stop and start detector
  2. Integrates the Job Scheduler Communication Mechanism. This includes registering job details, replacing the lockService with rest calls to job scheduler and registers the ADJobParameterTransportAction/ADJobRunnerTransportAction to facilitate job index entry parsing and job execution.
  3. Instantiation of the following metadata indices : .opendistro-anomaly-detection-state, .opendistro-anomaly-results, .opendistro-anomaly-detector-job, .opensearch-job-scheduler-job-details, and .opendistro-job-scheduler-lock
  4. Enables the creation, configuration, and execution of anomaly detector jobs

At the state of this PR, when a job is executed, this will continue running in an infinite loop, as the job execution workflow ends when AnomalyDetectorJobRunner.runAnomalyDetectionJob is invoked, which is commented out and will be integrated with the SDK in this issue : opensearch-project/opensearch-sdk-java#626. This eventually calls the AnomalyResultAction which indexes the anomaly result of the detector and releases the lock and thus ending the job. Subsequent attempts of Job Scheduler to execute the next run of the same Job will generate an anomaly result exception document due to a null lock, since the first execution has not yet released the lock. Indexing this anomaly result exception then instantiates the default anomaly result index.

The full workflow of the communication between Job Scheduler and Anomaly Detection Extension can be seen here : opensearch-project/opensearch-sdk-java#427

Issues Resolved

opensearch-project/opensearch-sdk-java#383

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Joshua Palis <[email protected]>
Signed-off-by: Joshua Palis <[email protected]>
Signed-off-by: Joshua Palis <[email protected]>
…g AnomalyDetectorJobTransportAction class to jacoco exclusions

Signed-off-by: Joshua Palis <[email protected]>
@codecov-commenter
Copy link

codecov-commenter commented Mar 7, 2023

Codecov Report

Merging #837 (bb740db) into feature/extensions (ec858b2) will decrease coverage by 4.98%.
The diff coverage is 3.06%.

📣 This organization is not using Codecov’s GitHub App Integration. We recommend you install it so Codecov can continue to function properly for your repositories. Learn more

Impacted file tree graph

@@                   Coverage Diff                    @@
##             feature/extensions     #837      +/-   ##
========================================================
- Coverage                 39.89%   34.92%   -4.98%     
+ Complexity                 2111     1865     -246     
========================================================
  Files                       297      298       +1     
  Lines                     17113    17510     +397     
  Branches                   1843     1857      +14     
========================================================
- Hits                       6827     6115     -712     
- Misses                     9799    10967    +1168     
+ Partials                    487      428      -59     
Flag Coverage Δ
plugin 34.92% <3.06%> (-4.98%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...va/org/opensearch/ad/AnomalyDetectorExtension.java 0.00% <0.00%> (ø)
...va/org/opensearch/ad/AnomalyDetectorJobRunner.java 0.00% <0.00%> (ø)
.../main/java/org/opensearch/ad/NodeStateManager.java 0.76% <ø> (-63.08%) ⬇️
.../java/org/opensearch/ad/caching/PriorityCache.java 0.00% <ø> (ø)
.../java/org/opensearch/ad/ratelimit/BatchWorker.java 100.00% <ø> (ø)
.../opensearch/ad/ratelimit/CheckpointReadWorker.java 0.72% <ø> (-86.96%) ⬇️
...opensearch/ad/ratelimit/CheckpointWriteWorker.java 93.82% <ø> (ø)
.../org/opensearch/ad/ratelimit/ColdEntityWorker.java 95.45% <ø> (ø)
.../org/opensearch/ad/ratelimit/ConcurrentWorker.java 68.00% <ø> (-8.00%) ⬇️
...opensearch/ad/ratelimit/EntityColdStartWorker.java 100.00% <ø> (ø)
... and 26 more

... and 26 files with indirect coverage changes

… AD_JOB_TYPE to AnomalyDetectorExtension

Signed-off-by: Joshua Palis <[email protected]>
Signed-off-by: Joshua Palis <[email protected]>
…bRunner. Modifies updateLatestFlagofOldTasksAndCreateNewTask() to use a blocking rest request instead of client.execute()

Signed-off-by: Joshua Palis <[email protected]>
Signed-off-by: Joshua Palis <[email protected]>
…/ create components. Migrates start detector workflow invocations of opensearch actions with corresponding opensearchAsyncClient methods

Signed-off-by: Joshua Palis <[email protected]>
…aAsyncClient as part of create components to close later along with the sdkRestClient

Signed-off-by: Joshua Palis <[email protected]>
…duler rather than the action name

Signed-off-by: Joshua Palis <[email protected]>
Copy link
Member

@owaiskazi19 owaiskazi19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work with the migration and integrating job scheduler! Good to see the workflow.

Copy link
Member

@dbwiddis dbwiddis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with some observations and an agreement about a rename that you can consider now or later when we re-branch and merge.

return ImmutableList
.of(sdkRestClient, anomalyDetectionIndices, jvmService, adCircuitBreakerService, adTaskManager, adTaskCacheManager);
.of(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a comment as I haven't checked. Ideally the order here and in code above defining these matches the AD Plugin, to minimize the diff.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I've followed it as closely as I could. There are some components that have not been implemented yet, since I havent needed them yet. But overall, the order in which the components are created and returned is the same as ADPlugin

Math.max(1, OpenSearchExecutors.allocatedProcessors(settings) / 2),
TimeValue.timeValueMinutes(10),
AD_THREAD_POOL_PREFIX + AD_THREAD_POOL_NAME
)// ,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Style) Would really prefer block comment format-preserving comments here. (Start with /*- and keep the indentation in the middle before the */.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ill go ahead and fix this

AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX,
false,
ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getAnomalyDetectorJobMappings)
),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No complaint here, just pointing to the diff above as a reason we shouldn't be using // comments on blocks of code :)

// }
//
// }
/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UGH, this diff!!!!

I don't even know what changed!

Copy link
Member

@owaiskazi19 owaiskazi19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there. Few more nitpicks and questions

releaseLockRequest.addParameter(LockModel.LOCK_ID, lock.getLockId());

try {
Response releaseLockResponse = client.performRequest(releaseLockRequest);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is performRequest async?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

performRequest is a blocking call. There is currently no async mechanism to send a rest call via the SDKRestClient.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joshpalis let's add async in SDK and use it here. This can be done in the next PR.

try {
CompletableFuture<DeleteByQueryResponse> deleteResponse = sdkJavaAsyncClient.deleteByQuery(deleteRequest);
DeleteByQueryResponse response = deleteResponse.orTimeout(10L, TimeUnit.SECONDS).get();
if (response.timedOut() || !response.failures().isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should test this whenever we uncomment the tests and also can you create an issue on opensearch-java for getBulkFailures?

logger.warn(BULK_FAILURE_LOG_MSG + " {}", detectorID);
for (BulkIndexByScrollFailure bulkFailure : response.failures()) {
logger.warn(bulkFailure);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little confusing to me. We have BulkIndexByScrollFailure but old code has ScrollableHitSource which is use for the scrollable results. @saratvemulapalli need some inputs here to verify the changes in this file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VachaShah just wanted to get your input on this. Please correct me if I am wrong, but as we discussed before, the DeleteByQueryResponse failures for opensearch-java's OpenSearchAsyncClient must be read from the list of BulkIndexByScrollFailure and there are no corresponding types for the HighLevelRestClient's SearchFailure and BulkFailure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes for DeleteByQueryResponse in the java-client, the failures need to be read from the list of BulkByIndexScrollFailures.

src/main/java/org/opensearch/ad/task/ADTaskManager.java Outdated Show resolved Hide resolved
Copy link
Member

@owaiskazi19 owaiskazi19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking for some inputs from @VachaShah! If any code change is required @joshpalis you can handle it in the next PR!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants