Skip to content

Commit

Permalink
ML: Adds set_upgrade_mode API endpoint (#37837)
Browse files Browse the repository at this point in the history
* ML: Add MlMetadata.upgrade_mode and API

* Adding tests

* Adding wait conditionals for the upgrade_mode call to return

* Adding tests

* adjusting format and tests

* Adjusting wait conditions for api return and msgs

* adjusting doc tests

* adding upgrade mode tests to black list
  • Loading branch information
benwtrent authored Jan 28, 2019
1 parent e401ab1 commit 7e4c0e6
Show file tree
Hide file tree
Showing 24 changed files with 1,112 additions and 70 deletions.
1 change: 1 addition & 0 deletions docs/reference/ml/apis/get-ml-info.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ This is a possible response:
"scroll_size" : 1000
}
},
"upgrade_mode": false,
"limits" : { }
}
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction;
Expand Down Expand Up @@ -291,6 +292,7 @@ public List<Action<? extends ActionResponse>> getClientActions() {
PostCalendarEventsAction.INSTANCE,
PersistJobAction.INSTANCE,
FindFileStructureAction.INSTANCE,
SetUpgradeModeAction.INSTANCE,
// security
ClearRealmCacheAction.INSTANCE,
ClearRolesCacheAction.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,30 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
public static final String TYPE = "ml";
private static final ParseField JOBS_FIELD = new ParseField("jobs");
private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds");
public static final ParseField UPGRADE_MODE = new ParseField("upgrade_mode");

public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap());
public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), false);
// This parser follows the pattern that metadata is parsed leniently (to allow for enhancements)
public static final ObjectParser<Builder, Void> LENIENT_PARSER = new ObjectParser<>("ml_metadata", true, Builder::new);

static {
LENIENT_PARSER.declareObjectArray(Builder::putJobs, (p, c) -> Job.LENIENT_PARSER.apply(p, c).build(), JOBS_FIELD);
LENIENT_PARSER.declareObjectArray(Builder::putDatafeeds,
(p, c) -> DatafeedConfig.LENIENT_PARSER.apply(p, c).build(), DATAFEEDS_FIELD);
LENIENT_PARSER.declareBoolean(Builder::isUpgradeMode, UPGRADE_MODE);

}

private final SortedMap<String, Job> jobs;
private final SortedMap<String, DatafeedConfig> datafeeds;
private final boolean upgradeMode;
private final GroupOrJobLookup groupOrJobLookup;

private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds) {
private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds, boolean upgradeMode) {
this.jobs = Collections.unmodifiableSortedMap(jobs);
this.datafeeds = Collections.unmodifiableSortedMap(datafeeds);
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
this.upgradeMode = upgradeMode;
}

public Map<String, Job> getJobs() {
Expand Down Expand Up @@ -94,6 +99,10 @@ public Set<String> expandDatafeedIds(String expression, boolean allowNoDatafeeds
.expand(expression, allowNoDatafeeds);
}

public boolean isUpgradeMode() {
return upgradeMode;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.V_6_0_0_alpha1;
Expand Down Expand Up @@ -128,12 +137,20 @@ public MlMetadata(StreamInput in) throws IOException {
}
this.datafeeds = datafeeds;
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
this.upgradeMode = in.readBoolean();
} else {
this.upgradeMode = false;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
writeMap(jobs, out);
writeMap(datafeeds, out);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeBoolean(upgradeMode);
}
}

private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOutput out) throws IOException {
Expand All @@ -150,6 +167,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params);
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
builder.field(UPGRADE_MODE.getPreferredName(), upgradeMode);
return builder;
}

Expand All @@ -170,17 +188,24 @@ public static class MlMetadataDiff implements NamedDiff<MetaData.Custom> {

final Diff<Map<String, Job>> jobs;
final Diff<Map<String, DatafeedConfig>> datafeeds;
final boolean upgradeMode;

MlMetadataDiff(MlMetadata before, MlMetadata after) {
this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer());
this.datafeeds = DiffableUtils.diff(before.datafeeds, after.datafeeds, DiffableUtils.getStringKeySerializer());
this.upgradeMode = after.upgradeMode;
}

public MlMetadataDiff(StreamInput in) throws IOException {
this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new,
MlMetadataDiff::readJobDiffFrom);
this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new,
MlMetadataDiff::readDatafeedDiffFrom);
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
upgradeMode = in.readBoolean();
} else {
upgradeMode = false;
}
}

/**
Expand All @@ -192,13 +217,16 @@ public MlMetadataDiff(StreamInput in) throws IOException {
public MetaData.Custom apply(MetaData.Custom part) {
TreeMap<String, Job> newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs));
TreeMap<String, DatafeedConfig> newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds));
return new MlMetadata(newJobs, newDatafeeds);
return new MlMetadata(newJobs, newDatafeeds, upgradeMode);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
jobs.writeTo(out);
datafeeds.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeBoolean(upgradeMode);
}
}

@Override
Expand All @@ -223,7 +251,8 @@ public boolean equals(Object o) {
return false;
MlMetadata that = (MlMetadata) o;
return Objects.equals(jobs, that.jobs) &&
Objects.equals(datafeeds, that.datafeeds);
Objects.equals(datafeeds, that.datafeeds) &&
Objects.equals(upgradeMode, that.upgradeMode);
}

@Override
Expand All @@ -233,13 +262,14 @@ public final String toString() {

@Override
public int hashCode() {
return Objects.hash(jobs, datafeeds);
return Objects.hash(jobs, datafeeds, upgradeMode);
}

public static class Builder {

private TreeMap<String, Job> jobs;
private TreeMap<String, DatafeedConfig> datafeeds;
private boolean upgradeMode;

public Builder() {
jobs = new TreeMap<>();
Expand All @@ -253,6 +283,7 @@ public Builder(@Nullable MlMetadata previous) {
} else {
jobs = new TreeMap<>(previous.jobs);
datafeeds = new TreeMap<>(previous.datafeeds);
upgradeMode = previous.upgradeMode;
}
}

Expand Down Expand Up @@ -318,8 +349,13 @@ public Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
return this;
}

public Builder isUpgradeMode(boolean upgradeMode) {
this.upgradeMode = upgradeMode;
return this;
}

public MlMetadata build() {
return new MlMetadata(jobs, datafeeds);
return new MlMetadata(jobs, datafeeds, upgradeMode);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ public final class MlTasks {
public static final String JOB_TASK_NAME = "xpack/ml/job";
public static final String DATAFEED_TASK_NAME = "xpack/ml/datafeed";

private static final String JOB_TASK_ID_PREFIX = "job-";
private static final String DATAFEED_TASK_ID_PREFIX = "datafeed-";
public static final String JOB_TASK_ID_PREFIX = "job-";
public static final String DATAFEED_TASK_ID_PREFIX = "datafeed-";

public static final PersistentTasksCustomMetaData.Assignment AWAITING_UPGRADE =
new PersistentTasksCustomMetaData.Assignment(null,
"persistent task cannot be assigned while upgrade mode is enabled.");

private MlTasks() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

public class SetUpgradeModeAction extends Action<AcknowledgedResponse> {

public static final SetUpgradeModeAction INSTANCE = new SetUpgradeModeAction();
public static final String NAME = "cluster:admin/xpack/ml/upgrade_mode";

private SetUpgradeModeAction() {
super(NAME);
}

@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}

public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {

private boolean enabled;

private static final ParseField ENABLED = new ParseField("enabled");
public static final ConstructingObjectParser<Request, Void> PARSER =
new ConstructingObjectParser<>(NAME, a -> new Request((Boolean)a[0]));

static {
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED);
}

public Request(boolean enabled) {
this.enabled = enabled;
}

public Request(StreamInput in) throws IOException {
readFrom(in);
}

public Request() {
}

public boolean isEnabled() {
return enabled;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.enabled = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(enabled);
}

@Override
public int hashCode() {
return Objects.hash(enabled);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(enabled, other.enabled);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ENABLED.getPreferredName(), enabled);
builder.endObject();
return builder;
}
}

static class RequestBuilder extends ActionRequestBuilder<Request, AcknowledgedResponse> {

RequestBuilder(ElasticsearchClient client, SetUpgradeModeAction action) {
super(client, action, new Request());
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction.Request;

public class SetUpgradeModeActionRequestTests extends AbstractSerializingTestCase<Request> {

@Override
protected Request createTestInstance() {
return new Request(randomBoolean());
}

@Override
protected boolean supportsUnknownFields() {
return false;
}

@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}

@Override
protected Request doParseInstance(XContentParser parser) {
return Request.PARSER.apply(parser, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction;
Expand Down Expand Up @@ -868,6 +869,7 @@ public void testMachineLearningAdminRole() {
assertThat(role.cluster().check(PutFilterAction.NAME, request), is(true));
assertThat(role.cluster().check(PutJobAction.NAME, request), is(true));
assertThat(role.cluster().check(RevertModelSnapshotAction.NAME, request), is(true));
assertThat(role.cluster().check(SetUpgradeModeAction.NAME, request), is(true));
assertThat(role.cluster().check(StartDatafeedAction.NAME, request), is(true));
assertThat(role.cluster().check(StopDatafeedAction.NAME, request), is(true));
assertThat(role.cluster().check(UpdateCalendarJobAction.NAME, request), is(true));
Expand Down Expand Up @@ -938,6 +940,7 @@ public void testMachineLearningUserRole() {
assertThat(role.cluster().check(PutFilterAction.NAME, request), is(false));
assertThat(role.cluster().check(PutJobAction.NAME, request), is(false));
assertThat(role.cluster().check(RevertModelSnapshotAction.NAME, request), is(false));
assertThat(role.cluster().check(SetUpgradeModeAction.NAME, request), is(false));
assertThat(role.cluster().check(StartDatafeedAction.NAME, request), is(false));
assertThat(role.cluster().check(StopDatafeedAction.NAME, request), is(false));
assertThat(role.cluster().check(UpdateCalendarJobAction.NAME, request), is(false));
Expand Down
6 changes: 5 additions & 1 deletion x-pack/plugin/ml/qa/ml-with-security/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ integTestRunner {
'ml/validate/Test job config that is invalid only because of the job ID',
'ml/validate_detector/Test invalid detector',
'ml/delete_forecast/Test delete on _all forecasts not allow no forecasts',
'ml/delete_forecast/Test delete forecast on missing forecast'
'ml/delete_forecast/Test delete forecast on missing forecast',
'ml/set_upgrade_mode/Attempt to open job when upgrade_mode is enabled',
'ml/set_upgrade_mode/Setting upgrade_mode to enabled',
'ml/set_upgrade_mode/Setting upgrade mode to disabled from enabled',
'ml/set_upgrade_mode/Test setting upgrade_mode to false when it is already false'
].join(',')
}

Expand Down
Loading

0 comments on commit 7e4c0e6

Please sign in to comment.