Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REST high-level client: add synced flush API #29189

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
Expand Down Expand Up @@ -263,6 +265,27 @@ public void flushAsync(FlushRequest flushRequest, ActionListener<FlushResponse>
listener, emptySet(), headers);
}

/** Initiate a synced flush manually using the synced flush API
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-synced-flush.html">
* Synced flush API on elastic.co</a>
*/
public SyncedFlushResponse syncedFlush(SyncedFlushRequest syncedFlushRequest, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(syncedFlushRequest, Request::syncedFlush,
SyncedFlushResponse::fromXContent, emptySet(), headers);
}

/**
* Asynchronously initiate a synced flush manually using the synced flush API
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-synced-flush.html">
* Synced flush API on elastic.co</a>
*/
public void syncedFlushAsync(SyncedFlushRequest syncedFlushRequest, ActionListener<SyncedFlushResponse> listener, Header... headers) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as odd as this sounds, could you rename the methods to flushSynced as that's how this API is referred to in our SPEC ? request and response can and should stay the same.

restHighLevelClient.performRequestAsyncAndParseEntity(syncedFlushRequest, Request::syncedFlush,
SyncedFlushResponse::fromXContent, listener, emptySet(), headers);
}

/**
* Force merge one or more indices using the Force Merge API
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand Down Expand Up @@ -234,6 +235,15 @@ static Request flush(FlushRequest flushRequest) {
return new Request(HttpPost.METHOD_NAME, endpoint, parameters.getParams(), null);
}

static Request syncedFlush(SyncedFlushRequest syncedFlushRequest) {
String[] indices = syncedFlushRequest.indices() == null ? Strings.EMPTY_ARRAY : syncedFlushRequest.indices();
String endpoint = endpoint(indices, "_flush", "synced");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_flush/synced can now be provided as a single argument, it also won't be encoded this way which is fine as we know that it doesn't need to be encoded.

Params syncedFlushparameters = Params.builder();
// This request takes no other parameters other than the indices.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove this comment

syncedFlushparameters.withIndicesOptions(syncedFlushRequest.indicesOptions());
return new Request(HttpPost.METHOD_NAME, endpoint, syncedFlushparameters.getParams(), null);
}

static Request forceMerge(ForceMergeRequest forceMergeRequest) {
String[] indices = forceMergeRequest.indices() == null ? Strings.EMPTY_ARRAY : forceMergeRequest.indices();
String endpoint = endpoint(indices, "_forcemerge");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
Expand Down Expand Up @@ -442,6 +444,32 @@ public void testFlush() throws IOException {
}
}

public void testSyncedFlush() throws IOException {
{
String index = "index";
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(index, settings);
SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(index);
SyncedFlushResponse flushResponse =
execute(syncedFlushRequest, highLevelClient().indices()::syncedFlush, highLevelClient().indices()::syncedFlushAsync);
assertThat(flushResponse.totalShards(), equalTo(1));
assertThat(flushResponse.successfulShards(), equalTo(1));
assertThat(flushResponse.failedShards(), equalTo(0));
//assertThat(flushResponse.shardFailures(), equalTo(BroadcastResponse.EMPTY));
}
{
String nonExistentIndex = "non_existent_index";
assertFalse(indexExists(nonExistentIndex));
SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(nonExistentIndex);
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> execute(syncedFlushRequest, highLevelClient().indices()::syncedFlush, highLevelClient().indices()::syncedFlushAsync));
assertEquals(RestStatus.NOT_FOUND, exception.status());
}
}

public void testClearCache() throws IOException {
{
String index = "index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand Down Expand Up @@ -622,6 +623,31 @@ public void testFlush() {
assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
}

public void testSyncedFlush() {
String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5);
SyncedFlushRequest syncedFlushRequest;
if (randomBoolean()) {
syncedFlushRequest = new SyncedFlushRequest(indices);
} else {
syncedFlushRequest = new SyncedFlushRequest();
syncedFlushRequest.indices(indices);
}
Map<String, String> expectedParams = new HashMap<>();
setRandomIndicesOptions(syncedFlushRequest::indicesOptions, syncedFlushRequest::indicesOptions, expectedParams);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remove one of these empty lines please?

Request request = Request.syncedFlush(syncedFlushRequest);
StringJoiner endpoint = new StringJoiner("/", "/", "");
if (indices != null && indices.length > 0) {
endpoint.add(String.join(",", indices));
}
endpoint.add("_flush");
endpoint.add("synced");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

endpoint.add("_flush/synced") ?

assertThat(request.getEndpoint(), equalTo(endpoint.toString()));
assertThat(request.getParameters(), equalTo(expectedParams));
assertThat(request.getEntity(), nullValue());
assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
}

public void testForceMerge() {
String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5);
ForceMergeRequest forceMergeRequest;
Expand Down
Loading