From 0b8ebebc954890b1a3eddceb6d45ade8e9af7da7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Tue, 8 Sep 2020 10:44:54 +0200 Subject: [PATCH] Add repositories metering API This pull request adds a new set of APIs that allows tracking the number of requests performed by the different registered repositories. In order to avoid losing data, the repository statistics are archived after the repository is closed for a configurable retention period `repositories.stats.archive.retention_period`. The API exposes the statistics for the active repositories as well as the modified/closed repositories. Backport of #60371 --- ...ear-repositories-metering-archive.asciidoc | 35 ++ .../apis/get-repositories-metering.asciidoc | 35 ++ .../apis/repositories-meterings-body.asciidoc | 178 +++++++++ .../repositories-metering-apis.asciidoc | 16 + docs/reference/rest-api/index.asciidoc | 2 + .../azure/AzureBlobStoreRepositoryTests.java | 31 +- .../repositories/azure/AzureBlobStore.java | 35 +- .../repositories/azure/AzureRepository.java | 12 +- ...eCloudStorageBlobStoreRepositoryTests.java | 21 +- .../GoogleCloudStorageOperationsStats.java | 7 +- .../gcs/GoogleCloudStorageRepository.java | 12 +- .../s3/S3BlobStoreRepositoryTests.java | 13 +- .../repositories/s3/S3BlobStore.java | 8 +- .../repositories/s3/S3Repository.java | 17 +- .../repositories/RepositoriesService.java | 51 ++- .../RepositoriesStatsArchive.java | 121 ++++++ .../repositories/RepositoryInfo.java | 132 +++++++ .../repositories/RepositoryStats.java | 21 + .../repositories/RepositoryStatsSnapshot.java | 114 ++++++ .../blobstore/MeteredBlobStoreRepository.java | 59 +++ .../RepositoriesServiceTests.java | 119 +++++- .../RepositoriesStatsArchiveTests.java | 118 ++++++ .../fixtures/azure-fixture/docker-compose.yml | 9 + test/fixtures/gcs-fixture/docker-compose.yml | 12 + test/fixtures/s3-fixture/docker-compose.yml | 15 + ...ESMockAPIBasedRepositoryIntegTestCase.java | 46 +-- .../test/rest/ESRestTestCase.java | 53 +++ .../repositories-metering-api/build.gradle | 46 +++ .../qa/azure/build.gradle | 99 +++++ .../azure/AzureRepositoriesMeteringIT.java | 54 +++ .../repositories-metering-api/qa/build.gradle | 6 + .../qa/gcs/build.gradle | 130 ++++++ .../gcs/GCSRepositoriesMeteringIT.java | 53 +++ .../qa/s3/build.gradle | 75 ++++ .../metering/s3/S3RepositoriesMeteringIT.java | 54 +++ .../metering/RepositoriesMeteringPlugin.java | 56 +++ ...learRepositoriesMeteringArchiveAction.java | 19 + ...earRepositoriesMeteringArchiveRequest.java | 37 ++ .../action/RepositoriesMeteringAction.java | 19 + .../action/RepositoriesMeteringRequest.java | 22 ++ .../action/RepositoriesMeteringResponse.java | 53 +++ .../RepositoriesNodeMeteringResponse.java | 50 +++ ...rtClearRepositoriesStatsArchiveAction.java | 98 +++++ .../TransportRepositoriesStatsAction.java | 84 ++++ ...learRepositoriesMeteringArchiveAction.java | 46 +++ .../RestGetRepositoriesMeteringAction.java | 41 ++ ...ctRepositoriesMeteringAPIRestTestCase.java | 374 ++++++++++++++++++ .../RepositoriesMeteringResponseTests.java | 104 +++++ ...stractSearchableSnapshotsRestTestCase.java | 41 -- 49 files changed, 2712 insertions(+), 141 deletions(-) create mode 100644 docs/reference/repositories-metering-api/apis/clear-repositories-metering-archive.asciidoc create mode 100644 docs/reference/repositories-metering-api/apis/get-repositories-metering.asciidoc create mode 100644 docs/reference/repositories-metering-api/apis/repositories-meterings-body.asciidoc create mode 100644 docs/reference/repositories-metering-api/repositories-metering-apis.asciidoc create mode 100644 server/src/main/java/org/elasticsearch/repositories/RepositoriesStatsArchive.java create mode 100644 server/src/main/java/org/elasticsearch/repositories/RepositoryInfo.java create mode 100644 server/src/main/java/org/elasticsearch/repositories/RepositoryStatsSnapshot.java create mode 100644 server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java create mode 100644 server/src/test/java/org/elasticsearch/repositories/RepositoriesStatsArchiveTests.java create mode 100644 x-pack/plugin/repositories-metering-api/build.gradle create mode 100644 x-pack/plugin/repositories-metering-api/qa/azure/build.gradle create mode 100644 x-pack/plugin/repositories-metering-api/qa/azure/src/test/java/org/elasticsearch/xpack/repositories/metering/azure/AzureRepositoriesMeteringIT.java create mode 100644 x-pack/plugin/repositories-metering-api/qa/build.gradle create mode 100644 x-pack/plugin/repositories-metering-api/qa/gcs/build.gradle create mode 100644 x-pack/plugin/repositories-metering-api/qa/gcs/src/test/java/org/elasticsearch/xpack/repositories/metering/gcs/GCSRepositoriesMeteringIT.java create mode 100644 x-pack/plugin/repositories-metering-api/qa/s3/build.gradle create mode 100644 x-pack/plugin/repositories-metering-api/qa/s3/src/test/java/org/elasticsearch/xpack/repositories/metering/s3/S3RepositoriesMeteringIT.java create mode 100644 x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/RepositoriesMeteringPlugin.java create mode 100644 x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveAction.java create mode 100644 x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveRequest.java create mode 100644 x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringAction.java create mode 100644 x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringRequest.java create mode 100644 x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponse.java create mode 100644 x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesNodeMeteringResponse.java create mode 100644 x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportClearRepositoriesStatsArchiveAction.java create mode 100644 x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportRepositoriesStatsAction.java create mode 100644 x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/rest/RestClearRepositoriesMeteringArchiveAction.java create mode 100644 x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/rest/RestGetRepositoriesMeteringAction.java create mode 100644 x-pack/plugin/repositories-metering-api/src/test/java/org/elasticsearch/xpack/repositories/metering/AbstractRepositoriesMeteringAPIRestTestCase.java create mode 100644 x-pack/plugin/repositories-metering-api/src/test/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponseTests.java diff --git a/docs/reference/repositories-metering-api/apis/clear-repositories-metering-archive.asciidoc b/docs/reference/repositories-metering-api/apis/clear-repositories-metering-archive.asciidoc new file mode 100644 index 0000000000000..33441ee5efa31 --- /dev/null +++ b/docs/reference/repositories-metering-api/apis/clear-repositories-metering-archive.asciidoc @@ -0,0 +1,35 @@ +[role="xpack"] +[testenv="basic"] +[[clear-repositories-metering-archive-api]] +=== Clear repositories metering archive +++++ +Clear repositories metering archive +++++ + +Removes the archived repositories metering information present in the cluster. + +[[clear-repositories-metering-archive-api-request]] +==== {api-request-title} + +`DELETE /_nodes//_repositories_metering/` + +[[clear-repositories-metering-archive-api-desc]] +==== {api-description-title} + +You can use this API to clear the archived repositories metering information in the cluster. + +[[clear-repositories-metering-archive-api-path-params]] +==== {api-path-parms-title} + +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=node-id] + +``:: + (long) Specifies the maximum <> to be cleared from the archive. + +All the nodes selective options are explained <>. +[role="child_attributes"] +[[clear-repositories-metering-archive-api-response-body]] +==== {api-response-body-title} +Returns the deleted archived repositories metering information. + +include::{es-repo-dir}/repositories-metering-api/apis/repositories-meterings-body.asciidoc[tag=repositories-metering-body] diff --git a/docs/reference/repositories-metering-api/apis/get-repositories-metering.asciidoc b/docs/reference/repositories-metering-api/apis/get-repositories-metering.asciidoc new file mode 100644 index 0000000000000..409d74cbde295 --- /dev/null +++ b/docs/reference/repositories-metering-api/apis/get-repositories-metering.asciidoc @@ -0,0 +1,35 @@ +[role="xpack"] +[testenv="basic"] +[[get-repositories-metering-api]] +=== Get repositories metering information +++++ +Get repositories metering information +++++ + +Returns cluster repositories metering information. + +[[get-repositories-metering-api-request]] +==== {api-request-title} + +`GET /_nodes//_repositories_metering` + +[[get-repositories-metering-api-desc]] +==== {api-description-title} + +You can use the cluster repositories metering API to retrieve repositories metering information in a cluster. + +This API exposes monotonically non-decreasing counters and it's expected that clients would durably store +the information needed to compute aggregations over a period of time. Additionally, the information +exposed by this API is volatile, meaning that it won't be present after node restarts. + +[[get-repositories-metering-api-path-params]] +==== {api-path-parms-title} + +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=node-id] + +All the nodes selective options are explained <>. + +[role="child_attributes"] +[[get-repositories-metering-api-response-body]] +==== {api-response-body-title} +include::{es-repo-dir}/repositories-metering-api/apis/repositories-meterings-body.asciidoc[tag=repositories-metering-body] diff --git a/docs/reference/repositories-metering-api/apis/repositories-meterings-body.asciidoc b/docs/reference/repositories-metering-api/apis/repositories-meterings-body.asciidoc new file mode 100644 index 0000000000000..fa37bb6ba1853 --- /dev/null +++ b/docs/reference/repositories-metering-api/apis/repositories-meterings-body.asciidoc @@ -0,0 +1,178 @@ +tag::repositories-metering-body[] +`_nodes`:: +(object) +Contains statistics about the number of nodes selected by the request. ++ +.Properties of `_nodes` +[%collapsible%open] +==== +`total`:: +(integer) +Total number of nodes selected by the request. + +`successful`:: +(integer) +Number of nodes that responded successfully to the request. + +`failed`:: +(integer) +Number of nodes that rejected the request or failed to respond. If this value +is not `0`, a reason for the rejection or failure is included in the response. +==== + +`cluster_name`:: +(string) +Name of the cluster. Based on the <> setting. + +`nodes`:: +(object) +Contains repositories metering information for the nodes selected by the request. ++ +.Properties of `nodes` +[%collapsible%open] +==== +``:: +(array) +An array of repository metering information for the node. ++ +.Properties of objects in `node_id` +[%collapsible%open] +===== +`repository_name`:: +(string) +Repository name. + +`repository_type`:: +(string) +Repository type. + +`repository_location`:: +(object) +Represents an unique location within the repository. ++ +.Properties of `repository_location` for repository type `Azure` +[%collapsible%open] +====== +`base_path`:: +(string) +The path within the container where the repository stores data. + +`container`:: +(string) +Container name. +====== ++ +.Properties of `repository_location` for repository type `GCP` +[%collapsible%open] +====== +`base_path`:: +(string) +The path within the bucket where the repository stores data. + +`bucket`:: +(string) +Bucket name. +====== ++ +.Properties of `repository_location` for repository type `S3` +[%collapsible%open] +====== +`base_path`:: +(string) +The path within the bucket where the repository stores data. + +`bucket`:: +(string) +Bucket name. +====== +`repository_ephemeral_id`:: +(string) +An identifier that changes every time the repository is updated. + +`repository_started_at`:: +(long) +Time the repository was created or updated. Recorded in milliseconds +since the https://en.wikipedia.org/wiki/Unix_time[Unix Epoch]. + +`repository_stopped_at`:: +(Optional, long) +Time the repository was deleted or updated. Recorded in milliseconds +since the https://en.wikipedia.org/wiki/Unix_time[Unix Epoch]. + +`archived`:: +(boolean) +A flag that tells whether or not this object has been archived. +When a repository is closed or updated the repository metering information +is archived and kept for a certain period of time. This allows retrieving +the repository metering information of previous repository instantiations. + +`archive_version`:: +(Optional, long) +The cluster state version when this object was archived, this field +can be used as a logical timestamp to delete all the archived metrics up +to an observed version. This field is only present for archived +repository metering information objects. The main purpose of this +field is to avoid possible race conditions during repository metering +information deletions, i.e. deleting archived repositories metering +information that we haven't observed yet. + +`request_counts`:: +(object) +An object with the number of request performed against the repository +grouped by request type. ++ +.Properties of `request_counts` for repository type `Azure` +[%collapsible%open] +====== +`GetBlobProperties`:: +(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties[Get Blob Properties] requests. +`GetBlob`:: +(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob[Get Blob] requests. +`ListBlobs`:: +(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs[List Blobs] requests. +`PutBlob`:: +(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob[Put Blob] requests. +`PutBlock`:: +(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/put-block[Put Block]. +`PutBlockList`:: +(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list[Put Block List] requests. + +Azure storage https://azure.microsoft.com/en-us/pricing/details/storage/blobs/[pricing]. +====== ++ +.Properties of `request_counts` for repository type `GCP` +[%collapsible%open] +====== +`GetObject`:: +(long) Number of https://cloud.google.com/storage/docs/json_api/v1/objects/get[get object] requests. +`ListObjects`:: +(long) Number of https://cloud.google.com/storage/docs/json_api/v1/objects/list[list objects] requests. +`InsertObject`:: +(long) Number of https://cloud.google.com/storage/docs/json_api/v1/objects/insert[insert object] requests, +including https://cloud.google.com/storage/docs/uploading-objects[simple], https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload[multipart] and +https://cloud.google.com/storage/docs/resumable-uploads[resumable] uploads. Resumable uploads can perform multiple http requests to +insert a single object but they are considered as a single request since they are https://cloud.google.com/storage/docs/resumable-uploads#introduction[billed] as an individual operation. + +Google Cloud storage https://cloud.google.com/storage/pricing[pricing]. +====== ++ +.Properties of `request_counts` for repository type `S3` +[%collapsible%open] +====== +`GetObject`:: +(long) Number of https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html[GetObject] requests. +`ListObjects`:: +(long) Number of https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html[ListObjects] requests. +`PutObject`:: +(long) Number of https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html[PutObject] requests. +`PutMultipartObject`:: +(long) Number of https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html[Multipart] requests, +including https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html[CreateMultipartUpload], +https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html[UploadPart] and https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html[CompleteMultipartUpload] +requests. + +Amazon Web Services Simple Storage Service https://aws.amazon.com/s3/pricing/[pricing]. +====== +===== +==== +end::repositories-metering-body[] diff --git a/docs/reference/repositories-metering-api/repositories-metering-apis.asciidoc b/docs/reference/repositories-metering-api/repositories-metering-apis.asciidoc new file mode 100644 index 0000000000000..7d6a0b8da0a2c --- /dev/null +++ b/docs/reference/repositories-metering-api/repositories-metering-apis.asciidoc @@ -0,0 +1,16 @@ +[role="xpack"] +[testenv="basic"] +[[repositories-metering-apis]] +== Repositories metering APIs + +experimental[] + +You can use the following APIs to retrieve repositories metering information. + +This is an API used by Elastic's commercial offerings. + +* <> +* <> + +include::apis/get-repositories-metering.asciidoc[] +include::apis/clear-repositories-metering-archive.asciidoc[] diff --git a/docs/reference/rest-api/index.asciidoc b/docs/reference/rest-api/index.asciidoc index 99ae417cff87b..cfe21493308de 100644 --- a/docs/reference/rest-api/index.asciidoc +++ b/docs/reference/rest-api/index.asciidoc @@ -30,6 +30,7 @@ endif::[] * <> * <> * <> +* <> * <> * <> ifdef::permanently-unreleased-branch[] @@ -63,6 +64,7 @@ include::{es-repo-dir}/ml/anomaly-detection/apis/index.asciidoc[] include::{es-repo-dir}/ml/df-analytics/apis/index.asciidoc[] include::{es-repo-dir}/migration/migration.asciidoc[] include::{es-repo-dir}/indices/apis/reload-analyzers.asciidoc[] +include::{es-repo-dir}/repositories-metering-api/repositories-metering-apis.asciidoc[] include::{es-repo-dir}/rollup/rollup-api.asciidoc[] include::{es-repo-dir}/search.asciidoc[] ifdef::permanently-unreleased-branch[] diff --git a/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index b29009ca2a049..944081d75ea75 100644 --- a/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -40,7 +40,6 @@ import java.util.Base64; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.regex.Pattern; @@ -76,11 +75,6 @@ protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) { return new AzureErroneousHttpHandler(delegate, randomIntBetween(2, 3)); } - @Override - protected List requestTypesTracked() { - return org.elasticsearch.common.collect.List.of("GET", "LIST", "HEAD", "PUT", "PUT_BLOCK"); - } - @Override protected Settings nodeSettings(int nodeOrdinal) { final String key = Base64.getEncoder().encodeToString(randomAlphaOfLength(10).getBytes(StandardCharsets.UTF_8)); @@ -180,23 +174,28 @@ private AzureHTTPStatsCollectorHandler(HttpHandler delegate) { @Override protected void maybeTrack(String request, Headers headers) { if (Regex.simpleMatch("GET /*/*", request)) { - trackRequest("GET"); + trackRequest("GetBlob"); } else if (Regex.simpleMatch("HEAD /*/*", request)) { - trackRequest("HEAD"); + trackRequest("GetBlobProperties"); } else if (listPattern.matcher(request).matches()) { - trackRequest("LIST"); - } else if (isBlockUpload(request)) { - trackRequest("PUT_BLOCK"); + trackRequest("ListBlobs"); + } else if (isPutBlock(request)) { + trackRequest("PutBlock"); + } else if (isPutBlockList(request)) { + trackRequest("PutBlockList"); } else if (Regex.simpleMatch("PUT /*/*", request)) { - trackRequest("PUT"); + trackRequest("PutBlob"); } } - // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block - private boolean isBlockUpload(String request) { - return Regex.simpleMatch("PUT /*/*?*comp=blocklist*", request) - || (Regex.simpleMatch("PUT /*/*?*comp=block*", request) && request.contains("blockid=")); + private boolean isPutBlock(String request) { + return Regex.simpleMatch("PUT /*/*?*comp=block*", request) && request.contains("blockid="); + } + + // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list + private boolean isPutBlockList(String request) { + return Regex.simpleMatch("PUT /*/*?*comp=blocklist*", request); } } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 96e3a3c0fdae7..9626b970817ce 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -120,19 +120,19 @@ public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, this.uploadMetricsCollector = (httpURLConnection -> { assert httpURLConnection.getRequestMethod().equals("PUT"); String queryParams = httpURLConnection.getURL().getQuery(); - if (queryParams != null && isBlockUpload(queryParams)) { - stats.putBlockOperations.incrementAndGet(); - } else { + if (queryParams == null) { stats.putOperations.incrementAndGet(); + return; } - }); - } - private boolean isBlockUpload(String queryParams) { - // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block - // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list - return (queryParams.contains("comp=block") && queryParams.contains("blockid=")) - || queryParams.contains("comp=blocklist"); + // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block + // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list + if (queryParams.contains("comp=block") && queryParams.contains("blockid=")) { + stats.putBlockOperations.incrementAndGet(); + } else if (queryParams.contains("comp=blocklist")) { + stats.putBlockListOperations.incrementAndGet(); + } + }); } @Override @@ -385,14 +385,15 @@ private static class Stats { private final AtomicLong putBlockOperations = new AtomicLong(); + private final AtomicLong putBlockListOperations = new AtomicLong(); + private Map toMap() { - return org.elasticsearch.common.collect.Map.of( - "GET", getOperations.get(), - "LIST", listOperations.get(), - "HEAD", headOperations.get(), - "PUT", putOperations.get(), - "PUT_BLOCK", putBlockOperations.get() - ); + return org.elasticsearch.common.collect.Map.of("GetBlob", getOperations.get(), + "ListBlobs", listOperations.get(), + "GetBlobProperties", headOperations.get(), + "PutBlob", putOperations.get(), + "PutBlock", putBlockOperations.get(), + "PutBlockList", putBlockListOperations.get()); } } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index 4107cba65c315..55e367ea7bd39 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -33,9 +33,10 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import java.util.Locale; +import java.util.Map; import java.util.function.Function; import static org.elasticsearch.repositories.azure.AzureStorageService.MAX_CHUNK_SIZE; @@ -52,7 +53,7 @@ *
{@code compress}
If set to true metadata files will be stored compressed. Defaults to false.
* */ -public class AzureRepository extends BlobStoreRepository { +public class AzureRepository extends MeteredBlobStoreRepository { private static final Logger logger = LogManager.getLogger(AzureRepository.class); public static final String TYPE = "azure"; @@ -85,7 +86,7 @@ public AzureRepository( final ClusterService clusterService, final RecoverySettings recoverySettings) { super(metadata, Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, - recoverySettings); + recoverySettings, buildLocation(metadata)); this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings()); this.storageService = storageService; @@ -111,6 +112,11 @@ public AzureRepository( } } + private static Map buildLocation(RepositoryMetadata metadata) { + return org.elasticsearch.common.collect.Map.of("base_path", Repository.BASE_PATH_SETTING.get(metadata.settings()), + "container", Repository.CONTAINER_SETTING.get(metadata.settings())); + } + @Override protected BlobStore getBlobStore() { return super.getBlobStore(); diff --git a/plugins/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/plugins/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 7433eff7c065d..50d51f87abdb4 100644 --- a/plugins/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/plugins/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -62,7 +62,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -120,11 +119,6 @@ protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) { return new GoogleErroneousHttpHandler(delegate, randomIntBetween(2, 3)); } - @Override - protected List requestTypesTracked() { - return org.elasticsearch.common.collect.List.of("GET", "LIST", "POST", "PUT"); - } - @Override protected Settings nodeSettings(int nodeOrdinal) { final Settings.Builder settings = Settings.builder(); @@ -325,15 +319,18 @@ private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpSta @Override public void maybeTrack(final String request, Headers requestHeaders) { if (Regex.simpleMatch("GET /storage/v1/b/*/o/*", request)) { - trackRequest("GET"); + trackRequest("GetObject"); } else if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) { - trackRequest("LIST"); + trackRequest("ListObjects"); } else if (Regex.simpleMatch("GET /download/storage/v1/b/*", request)) { - trackRequest("GET"); - } else if (Regex.simpleMatch("PUT /upload/storage/v1/b/*", request) && isLastPart(requestHeaders)) { - trackRequest("PUT"); + trackRequest("GetObject"); + } else if (Regex.simpleMatch("PUT /upload/storage/v1/b/*uploadType=resumable*", request) && isLastPart(requestHeaders)) { + // Resumable uploads are billed as a single operation, that's the reason we're tracking + // the request only when it's the last part. + // See https://cloud.google.com/storage/docs/resumable-uploads#introduction + trackRequest("InsertObject"); } else if (Regex.simpleMatch("POST /upload/storage/v1/b/*uploadType=multipart*", request)) { - trackRequest("POST"); + trackRequest("InsertObject"); } } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java index 4a0de249a3e78..2264f71c29f4a 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java @@ -58,10 +58,9 @@ String getTrackedBucket() { Map toMap() { final Map results = new HashMap<>(); - results.put("GET", getCount.get()); - results.put("LIST", listCount.get()); - results.put("PUT", putCount.get()); - results.put("POST", postCount.get()); + results.put("GetObject", getCount.get()); + results.put("ListObjects", listCount.get()); + results.put("InsertObject", postCount.get() + putCount.get()); return results; } } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index d4b81e88fa4aa..58ec3a1f64822 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -31,8 +31,9 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.RepositoryException; -import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; +import java.util.Map; import java.util.function.Function; import static org.elasticsearch.common.settings.Setting.Property; @@ -40,7 +41,7 @@ import static org.elasticsearch.common.settings.Setting.byteSizeSetting; import static org.elasticsearch.common.settings.Setting.simpleString; -class GoogleCloudStorageRepository extends BlobStoreRepository { +class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { private static final Logger logger = LogManager.getLogger(GoogleCloudStorageRepository.class); // package private for testing @@ -76,7 +77,7 @@ class GoogleCloudStorageRepository extends BlobStoreRepository { final GoogleCloudStorageService storageService, final ClusterService clusterService, final RecoverySettings recoverySettings) { - super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService, recoverySettings); + super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata)); this.storageService = storageService; String basePath = BASE_PATH.get(metadata.settings()); @@ -96,6 +97,11 @@ class GoogleCloudStorageRepository extends BlobStoreRepository { logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath, chunkSize, isCompress()); } + private static Map buildLocation(RepositoryMetadata metadata) { + return org.elasticsearch.common.collect.Map.of("base_path", BASE_PATH.get(metadata.settings()), + "bucket", getSetting(BUCKET, metadata)); + } + @Override protected GoogleCloudStorageBlobStore createBlobStore() { return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bufferSize); diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index c61890751af15..d96cf762e0d60 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -119,11 +119,6 @@ protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) { return new S3StatsCollectorHttpHandler(new S3ErroneousHttpHandler(delegate, randomIntBetween(2, 3))); } - @Override - protected List requestTypesTracked() { - return org.elasticsearch.common.collect.List.of("GET", "LIST", "POST", "PUT"); - } - @Override protected Settings nodeSettings(int nodeOrdinal) { final MockSecureSettings secureSettings = new MockSecureSettings(); @@ -287,13 +282,13 @@ private static class S3StatsCollectorHttpHandler extends HttpStatsCollectorHandl @Override public void maybeTrack(final String request, Headers requestHeaders) { if (Regex.simpleMatch("GET /*/?prefix=*", request)) { - trackRequest("LIST"); + trackRequest("ListObjects"); } else if (Regex.simpleMatch("GET /*/*", request)) { - trackRequest("GET"); + trackRequest("GetObject"); } else if (isMultiPartUpload(request)) { - trackRequest("POST"); + trackRequest("PutMultipartObject"); } else if (Regex.simpleMatch("PUT /*/*", request)) { - trackRequest("PUT"); + trackRequest("PutObject"); } } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index e32a7dd6fd2cc..cd9b4b5763cb2 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -210,10 +210,10 @@ static class Stats { Map toMap() { final Map results = new HashMap<>(); - results.put("GET", getCount.get()); - results.put("LIST", listCount.get()); - results.put("PUT", putCount.get()); - results.put("POST", postCount.get()); + results.put("GetObject", getCount.get()); + results.put("ListObjects", listCount.get()); + results.put("PutObject", putCount.get()); + results.put("PutMultipartObject", postCount.get()); return results; } } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 93f76710ef0cc..0ff798df024ce 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -44,7 +44,7 @@ import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.ShardGenerations; -import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotsService; @@ -52,6 +52,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.Collection; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -69,7 +70,7 @@ *
{@code compress}
If set to true metadata files will be stored compressed. Defaults to false.
* */ -class S3Repository extends BlobStoreRepository { +class S3Repository extends MeteredBlobStoreRepository { private static final Logger logger = LogManager.getLogger(S3Repository.class); private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(logger.getName()); @@ -214,7 +215,12 @@ class S3Repository extends BlobStoreRepository { final S3Service service, final ClusterService clusterService, final RecoverySettings recoverySettings) { - super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, recoverySettings); + super(metadata, + COMPRESS_SETTING.get(metadata.settings()), + namedXContentRegistry, + clusterService, + recoverySettings, + buildLocation(metadata)); this.service = service; this.repositoryMetadata = metadata; @@ -265,6 +271,11 @@ class S3Repository extends BlobStoreRepository { storageClass); } + private static Map buildLocation(RepositoryMetadata metadata) { + return org.elasticsearch.common.collect.Map.of("base_path", BASE_PATH_SETTING.get(metadata.settings()), + "bucket", BUCKET_SETTING.get(metadata.settings())); + } + /** * Holds a reference to delayed repository operation {@link Scheduler.Cancellable} so it can be cancelled should the repository be * closed concurrently. diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 09f0c10ae48c8..4731bbdd1527a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -44,9 +44,12 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -57,6 +60,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.Set; /** @@ -66,6 +71,12 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C private static final Logger logger = LogManager.getLogger(RepositoriesService.class); + public static final Setting REPOSITORIES_STATS_ARCHIVE_RETENTION_PERIOD = + Setting.positiveTimeSetting("repositories.stats.archive.retention_period", TimeValue.timeValueHours(2), Setting.Property.NodeScope); + + public static final Setting REPOSITORIES_STATS_ARCHIVE_MAX_ARCHIVED_STATS = + Setting.intSetting("repositories.stats.archive.max_archived_stats", 100, 0, Setting.Property.NodeScope); + private final Map typesRegistry; private final Map internalTypesRegistry; @@ -77,6 +88,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C private final Map internalRepositories = ConcurrentCollections.newConcurrentMap(); private volatile Map repositories = Collections.emptyMap(); + private final RepositoriesStatsArchive repositoriesStatsArchive; + public RepositoriesService(Settings settings, ClusterService clusterService, TransportService transportService, Map typesRegistry, Map internalTypesRegistry, @@ -93,6 +106,9 @@ public RepositoriesService(Settings settings, ClusterService clusterService, Tra } } this.verifyAction = new VerifyNodeRepositoryAction(transportService, clusterService, this); + this.repositoriesStatsArchive = new RepositoriesStatsArchive(REPOSITORIES_STATS_ARCHIVE_RETENTION_PERIOD.get(settings), + REPOSITORIES_STATS_ARCHIVE_MAX_ARCHIVED_STATS.get(settings), + threadPool::relativeTimeInMillis); } /** @@ -316,7 +332,9 @@ public void applyClusterState(ClusterChangedEvent event) { for (Map.Entry entry : repositories.entrySet()) { if (newMetadata == null || newMetadata.repository(entry.getKey()) == null) { logger.debug("unregistering repository [{}]", entry.getKey()); - closeRepository(entry.getValue()); + Repository repository = entry.getValue(); + closeRepository(repository); + archiveRepositoryStats(repository, state.version()); } else { survivors.put(entry.getKey(), entry.getValue()); } @@ -335,6 +353,7 @@ public void applyClusterState(ClusterChangedEvent event) { // Previous version is different from the version in settings logger.debug("updating repository [{}]", repositoryMetadata.name()); closeRepository(repository); + archiveRepositoryStats(repository, state.version()); repository = null; try { repository = createRepository(repositoryMetadata, typesRegistry); @@ -405,6 +424,27 @@ public Repository repository(String repositoryName) { throw new RepositoryMissingException(repositoryName); } + public List repositoriesStats() { + List archivedRepoStats = repositoriesStatsArchive.getArchivedStats(); + List activeRepoStats = getRepositoryStatsForActiveRepositories(); + + List repositoriesStats = new ArrayList<>(archivedRepoStats); + repositoriesStats.addAll(activeRepoStats); + return repositoriesStats; + } + + private List getRepositoryStatsForActiveRepositories() { + return Stream.concat(repositories.values().stream(), internalRepositories.values().stream()) + .filter(r -> r instanceof MeteredBlobStoreRepository) + .map(r -> (MeteredBlobStoreRepository) r) + .map(MeteredBlobStoreRepository::statsSnapshot) + .collect(Collectors.toList()); + } + + public List clearRepositoriesStatsArchive(long maxVersionToClear) { + return repositoriesStatsArchive.clear(maxVersionToClear); + } + public void registerInternalRepository(String name, String type) { RepositoryMetadata metadata = new RepositoryMetadata(name, type, Settings.EMPTY); Repository repository = internalRepositories.computeIfAbsent(name, (n) -> { @@ -435,6 +475,15 @@ private void closeRepository(Repository repository) { repository.close(); } + private void archiveRepositoryStats(Repository repository, long clusterStateVersion) { + if (repository instanceof MeteredBlobStoreRepository) { + RepositoryStatsSnapshot stats = ((MeteredBlobStoreRepository) repository).statsSnapshotForArchival(clusterStateVersion); + if (repositoriesStatsArchive.archive(stats) == false) { + logger.warn("Unable to archive the repository stats [{}] as the archive is full.", stats); + } + } + } + /** * Creates repository holder. This method starts the repository */ diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesStatsArchive.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesStatsArchive.java new file mode 100644 index 0000000000000..d0af94b4155fd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesStatsArchive.java @@ -0,0 +1,121 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; +import java.util.function.LongSupplier; +import java.util.stream.Collectors; + +public final class RepositoriesStatsArchive { + private static final Logger logger = LogManager.getLogger(RepositoriesStatsArchive.class); + + private final TimeValue retentionPeriod; + private final int maxCapacity; + private final LongSupplier relativeTimeSupplier; + private final Deque archive = new ArrayDeque<>(); + + public RepositoriesStatsArchive(TimeValue retentionPeriod, + int maxCapacity, + LongSupplier relativeTimeSupplier) { + this.retentionPeriod = retentionPeriod; + this.maxCapacity = maxCapacity; + this.relativeTimeSupplier = relativeTimeSupplier; + } + + /** + * Archives the specified repository stats snapshot into the archive + * if it's possible without violating the capacity constraints. + * + * @return {@code true} if the repository stats were archived, {@code false} otherwise. + */ + synchronized boolean archive(final RepositoryStatsSnapshot repositoryStats) { + assert containsRepositoryStats(repositoryStats) == false + : "A repository with ephemeral id " + repositoryStats.getRepositoryInfo().ephemeralId + " is already archived"; + assert repositoryStats.isArchived(); + + evict(); + + if (archive.size() >= maxCapacity) { + return false; + } + + return archive.add(new ArchiveEntry(repositoryStats, relativeTimeSupplier.getAsLong())); + } + + synchronized List getArchivedStats() { + evict(); + return archive.stream().map(e -> e.repositoryStatsSnapshot).collect(Collectors.toList()); + } + + /** + * Clears the archive, returning the valid archived entries up until that point. + * + * @return the repository stats that were stored before clearing the archive. + */ + synchronized List clear(long maxVersionToClear) { + List clearedStats = new ArrayList<>(); + Iterator iterator = archive.iterator(); + while (iterator.hasNext()) { + RepositoryStatsSnapshot statsSnapshot = iterator.next().repositoryStatsSnapshot; + if (statsSnapshot.getClusterVersion() <= maxVersionToClear) { + clearedStats.add(statsSnapshot); + iterator.remove(); + } + } + logger.debug("RepositoriesStatsArchive have been cleared. Removed stats: [{}]", clearedStats); + return clearedStats; + } + + private void evict() { + ArchiveEntry entry; + while ((entry = archive.peek()) != null && entry.ageInMillis(relativeTimeSupplier) >= retentionPeriod.getMillis()) { + ArchiveEntry removedEntry = archive.poll(); + logger.debug("Evicting repository stats [{}]", removedEntry.repositoryStatsSnapshot); + } + } + + private boolean containsRepositoryStats(RepositoryStatsSnapshot repositoryStats) { + return archive.stream() + .anyMatch(entry -> + entry.repositoryStatsSnapshot.getRepositoryInfo().ephemeralId.equals(repositoryStats.getRepositoryInfo().ephemeralId)); + } + + private static class ArchiveEntry { + private final RepositoryStatsSnapshot repositoryStatsSnapshot; + private final long createdAtMillis; + + private ArchiveEntry(RepositoryStatsSnapshot repositoryStatsSnapshot, long createdAtMillis) { + this.repositoryStatsSnapshot = repositoryStatsSnapshot; + this.createdAtMillis = createdAtMillis; + } + + private long ageInMillis(LongSupplier relativeTimeInMillis) { + return Math.max(0, relativeTimeInMillis.getAsLong() - createdAtMillis); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryInfo.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryInfo.java new file mode 100644 index 0000000000000..8d2612ba70b04 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryInfo.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +public final class RepositoryInfo implements Writeable, ToXContentFragment { + public final String ephemeralId; + public final String name; + public final String type; + public final Map location; + public final long startedAt; + @Nullable + public final Long stoppedAt; + + public RepositoryInfo(String ephemeralId, + String name, + String type, + Map location, + long startedAt) { + this(ephemeralId, name, type, location, startedAt, null); + } + + public RepositoryInfo(String ephemeralId, + String name, + String type, + Map location, + long startedAt, + @Nullable Long stoppedAt) { + this.ephemeralId = ephemeralId; + this.name = name; + this.type = type; + this.location = location; + this.startedAt = startedAt; + if (stoppedAt != null && startedAt > stoppedAt) { + throw new IllegalArgumentException("createdAt must be before or equal to stoppedAt"); + } + this.stoppedAt = stoppedAt; + } + + public RepositoryInfo(StreamInput in) throws IOException { + this.ephemeralId = in.readString(); + this.name = in.readString(); + this.type = in.readString(); + this.location = in.readMap(StreamInput::readString, StreamInput::readString); + this.startedAt = in.readLong(); + this.stoppedAt = in.readOptionalLong(); + } + + public RepositoryInfo stopped(long stoppedAt) { + assert isStopped() == false : "The repository is already stopped"; + + return new RepositoryInfo(ephemeralId, name, type, location, startedAt, stoppedAt); + } + + public boolean isStopped() { + return stoppedAt != null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(ephemeralId); + out.writeString(name); + out.writeString(type); + out.writeMap(location, StreamOutput::writeString, StreamOutput::writeString); + out.writeLong(startedAt); + out.writeOptionalLong(stoppedAt); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("repository_name", name); + builder.field("repository_type", type); + builder.field("repository_location", location); + builder.field("repository_ephemeral_id", ephemeralId); + builder.field("repository_started_at", startedAt); + if (stoppedAt != null) { + builder.field("repository_stopped_at", stoppedAt); + } + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RepositoryInfo that = (RepositoryInfo) o; + return ephemeralId.equals(that.ephemeralId) && + name.equals(that.name) && + type.equals(that.type) && + location.equals(that.location) && + startedAt == that.startedAt && + Objects.equals(stoppedAt, that.stoppedAt); + } + + @Override + public int hashCode() { + return Objects.hash(ephemeralId, name, type, location, startedAt, stoppedAt); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryStats.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryStats.java index 50a8b46630179..d6fc680946b38 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryStats.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryStats.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Objects; public class RepositoryStats implements Writeable { @@ -55,4 +56,24 @@ public RepositoryStats merge(RepositoryStats otherStats) { public void writeTo(StreamOutput out) throws IOException { out.writeMap(requestCounts, StreamOutput::writeString, StreamOutput::writeLong); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RepositoryStats that = (RepositoryStats) o; + return requestCounts.equals(that.requestCounts); + } + + @Override + public int hashCode() { + return Objects.hash(requestCounts); + } + + @Override + public String toString() { + return "RepositoryStats{" + + "requestCounts=" + requestCounts + + '}'; + } } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryStatsSnapshot.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryStatsSnapshot.java new file mode 100644 index 0000000000000..55a13e5fde0f7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryStatsSnapshot.java @@ -0,0 +1,114 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +public final class RepositoryStatsSnapshot implements Writeable, ToXContentObject { + public static final long UNKNOWN_CLUSTER_VERSION = -1; + private final RepositoryInfo repositoryInfo; + private final RepositoryStats repositoryStats; + private final long clusterVersion; + private final boolean archived; + + public RepositoryStatsSnapshot(RepositoryInfo repositoryInfo, + RepositoryStats repositoryStats, + long clusterVersion, + boolean archived) { + assert archived != (clusterVersion == UNKNOWN_CLUSTER_VERSION); + this.repositoryInfo = repositoryInfo; + this.repositoryStats = repositoryStats; + this.clusterVersion = clusterVersion; + this.archived = archived; + } + + public RepositoryStatsSnapshot(StreamInput in) throws IOException { + this.repositoryInfo = new RepositoryInfo(in); + this.repositoryStats = new RepositoryStats(in); + this.clusterVersion = in.readLong(); + this.archived = in.readBoolean(); + } + + public RepositoryInfo getRepositoryInfo() { + return repositoryInfo; + } + + public RepositoryStats getRepositoryStats() { + return repositoryStats; + } + + public boolean isArchived() { + return archived; + } + + public long getClusterVersion() { + return clusterVersion; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + repositoryInfo.writeTo(out); + repositoryStats.writeTo(out); + out.writeLong(clusterVersion); + out.writeBoolean(archived); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + repositoryInfo.toXContent(builder, params); + builder.field("request_counts", repositoryStats.requestCounts); + builder.field("archived", archived); + if (archived) { + builder.field("cluster_version", clusterVersion); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RepositoryStatsSnapshot that = (RepositoryStatsSnapshot) o; + return repositoryInfo.equals(that.repositoryInfo) && + repositoryStats.equals(that.repositoryStats) && + clusterVersion == that.clusterVersion && + archived == that.archived; + } + + @Override + public int hashCode() { + return Objects.hash(repositoryInfo, repositoryStats, clusterVersion, archived); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java new file mode 100644 index 0000000000000..f87608c92692c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories.blobstore; + +import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.repositories.RepositoryInfo; +import org.elasticsearch.repositories.RepositoryStatsSnapshot; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Map; + +public abstract class MeteredBlobStoreRepository extends BlobStoreRepository { + private final RepositoryInfo repositoryInfo; + + public MeteredBlobStoreRepository(RepositoryMetadata metadata, + boolean compress, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + RecoverySettings recoverySettings, + Map location) { + super(metadata, compress, namedXContentRegistry, clusterService, recoverySettings); + ThreadPool threadPool = clusterService.getClusterApplierService().threadPool(); + this.repositoryInfo = new RepositoryInfo(UUIDs.randomBase64UUID(), + metadata.name(), + metadata.type(), + location, + threadPool.absoluteTimeInMillis()); + } + + public RepositoryStatsSnapshot statsSnapshot() { + return new RepositoryStatsSnapshot(repositoryInfo, stats(), RepositoryStatsSnapshot.UNKNOWN_CLUSTER_VERSION, false); + } + + public RepositoryStatsSnapshot statsSnapshotForArchival(long clusterVersion) { + RepositoryInfo stoppedRepoInfo = repositoryInfo.stopped(threadPool.absoluteTimeInMillis()); + return new RepositoryStatsSnapshot(stoppedRepoInfo, stats(), clusterVersion, true); + } +} diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 3607fadf7a951..dceb2b3c6ab10 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -23,23 +23,32 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ESTestCase; @@ -54,7 +63,9 @@ import java.util.function.Consumer; import java.util.function.Function; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class RepositoriesServiceTests extends ESTestCase { @@ -68,8 +79,16 @@ public void setUp() throws Exception { TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null, Collections.emptySet()); + final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class); + when(clusterApplierService.threadPool()).thenReturn(threadPool); + final ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService); + Map typesRegistry = + org.elasticsearch.common.collect.Map.of(TestRepository.TYPE, TestRepository::new, + MeteredRepositoryTypeA.TYPE, metadata -> new MeteredRepositoryTypeA(metadata, clusterService), + MeteredRepositoryTypeB.TYPE, metadata -> new MeteredRepositoryTypeB(metadata, clusterService)); repositoriesService = new RepositoriesService(Settings.EMPTY, mock(ClusterService.class), - transportService, Collections.emptyMap(), Collections.singletonMap(TestRepository.TYPE, TestRepository::new), threadPool); + transportService, typesRegistry, typesRegistry, threadPool); repositoriesService.start(); } @@ -115,6 +134,46 @@ public void testRegisterRejectsInvalidRepositoryNames() { } } + public void testRepositoriesStatsCanHaveTheSameNameAndDifferentTypeOverTime() { + String repoName = "name"; + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); + + ClusterState clusterStateWithRepoTypeA = createClusterStateWithRepo(repoName, MeteredRepositoryTypeA.TYPE); + + repositoriesService.applyClusterState(new ClusterChangedEvent("new repo", clusterStateWithRepoTypeA, emptyState())); + assertThat(repositoriesService.repositoriesStats().size(), equalTo(1)); + + repositoriesService.applyClusterState(new ClusterChangedEvent("new repo", emptyState(), clusterStateWithRepoTypeA)); + assertThat(repositoriesService.repositoriesStats().size(), equalTo(1)); + + ClusterState clusterStateWithRepoTypeB = createClusterStateWithRepo(repoName, MeteredRepositoryTypeB.TYPE); + repositoriesService.applyClusterState(new ClusterChangedEvent("new repo", clusterStateWithRepoTypeB, emptyState())); + + List repositoriesStats = repositoriesService.repositoriesStats(); + assertThat(repositoriesStats.size(), equalTo(2)); + RepositoryStatsSnapshot repositoryStatsTypeA = repositoriesStats.get(0); + assertThat(repositoryStatsTypeA.getRepositoryInfo().type, equalTo(MeteredRepositoryTypeA.TYPE)); + assertThat(repositoryStatsTypeA.getRepositoryStats(), equalTo(MeteredRepositoryTypeA.STATS)); + + RepositoryStatsSnapshot repositoryStatsTypeB = repositoriesStats.get(1); + assertThat(repositoryStatsTypeB.getRepositoryInfo().type, equalTo(MeteredRepositoryTypeB.TYPE)); + assertThat(repositoryStatsTypeB.getRepositoryStats(), equalTo(MeteredRepositoryTypeB.STATS)); + } + + private ClusterState createClusterStateWithRepo(String repoName, String repoType) { + ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); + Metadata.Builder mdBuilder = Metadata.builder(); + mdBuilder.putCustom(RepositoriesMetadata.TYPE, + new RepositoriesMetadata(Collections.singletonList(new RepositoryMetadata(repoName, repoType, Settings.EMPTY)))); + state.metadata(mdBuilder); + + return state.build(); + } + + private ClusterState emptyState() { + return ClusterState.builder(new ClusterName("test")).build(); + } + private void assertThrowsOnRegister(String repoName) { PutRepositoryRequest request = new PutRepositoryRequest(repoName); expectThrows(RepositoryException.class, () -> repositoriesService.registerRepository(request, null)); @@ -263,4 +322,62 @@ public void close() { isClosed = true; } } + + private static class MeteredRepositoryTypeA extends MeteredBlobStoreRepository { + private static final String TYPE = "type-a"; + private static final RepositoryStats STATS = new RepositoryStats(org.elasticsearch.common.collect.Map.of("GET", 10L)); + + private MeteredRepositoryTypeA(RepositoryMetadata metadata, ClusterService clusterService) { + super(metadata, + false, + mock(NamedXContentRegistry.class), + clusterService, + mock(RecoverySettings.class), + org.elasticsearch.common.collect.Map.of("bucket", "bucket-a")); + } + + @Override + protected BlobStore createBlobStore() { + return mock(BlobStore.class); + } + + @Override + public RepositoryStats stats() { + return STATS; + } + + @Override + public BlobPath basePath() { + return BlobPath.cleanPath(); + } + } + + private static class MeteredRepositoryTypeB extends MeteredBlobStoreRepository { + private static final String TYPE = "type-b"; + private static final RepositoryStats STATS = new RepositoryStats(org.elasticsearch.common.collect.Map.of("LIST", 20L)); + + private MeteredRepositoryTypeB(RepositoryMetadata metadata, ClusterService clusterService) { + super(metadata, + false, + mock(NamedXContentRegistry.class), + clusterService, + mock(RecoverySettings.class), + org.elasticsearch.common.collect.Map.of("bucket", "bucket-b")); + } + + @Override + protected BlobStore createBlobStore() { + return mock(BlobStore.class); + } + + @Override + public RepositoryStats stats() { + return STATS; + } + + @Override + public BlobPath basePath() { + return BlobPath.cleanPath(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesStatsArchiveTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesStatsArchiveTests.java new file mode 100644 index 0000000000000..aa95229a947fb --- /dev/null +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesStatsArchiveTests.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.Matchers.equalTo; + +public class RepositoriesStatsArchiveTests extends ESTestCase { + public void testStatsAreEvictedOnceTheyAreOlderThanRetentionPeriod() { + int retentionTimeInMillis = randomIntBetween(100, 1000); + + AtomicLong fakeRelativeClock = new AtomicLong(); + RepositoriesStatsArchive repositoriesStatsArchive = + new RepositoriesStatsArchive(TimeValue.timeValueMillis(retentionTimeInMillis), + 100, + fakeRelativeClock::get); + + for (int i = 0; i < randomInt(10); i++) { + RepositoryStatsSnapshot repoStats = createRepositoryStats(RepositoryStats.EMPTY_STATS); + repositoriesStatsArchive.archive(repoStats); + } + + fakeRelativeClock.set(retentionTimeInMillis * 2); + int statsToBeRetainedCount = randomInt(10); + for (int i = 0; i < statsToBeRetainedCount; i++) { + RepositoryStatsSnapshot repoStats = + createRepositoryStats(new RepositoryStats(org.elasticsearch.common.collect.Map.of("GET", 10L))); + repositoriesStatsArchive.archive(repoStats); + } + + List archivedStats = repositoriesStatsArchive.getArchivedStats(); + assertThat(archivedStats.size(), equalTo(statsToBeRetainedCount)); + for (RepositoryStatsSnapshot repositoryStatsSnapshot : archivedStats) { + assertThat(repositoryStatsSnapshot.getRepositoryStats().requestCounts, + equalTo(org.elasticsearch.common.collect.Map.of("GET", 10L))); + } + } + + public void testStatsAreRejectedIfTheArchiveIsFull() { + int retentionTimeInMillis = randomIntBetween(100, 1000); + + AtomicLong fakeRelativeClock = new AtomicLong(); + RepositoriesStatsArchive repositoriesStatsArchive = + new RepositoriesStatsArchive(TimeValue.timeValueMillis(retentionTimeInMillis), + 1, + fakeRelativeClock::get); + + assertTrue(repositoriesStatsArchive.archive(createRepositoryStats(RepositoryStats.EMPTY_STATS))); + + fakeRelativeClock.set(retentionTimeInMillis * 2); + // Now there's room since the previous stats should be evicted + assertTrue(repositoriesStatsArchive.archive(createRepositoryStats(RepositoryStats.EMPTY_STATS))); + // There's no room for stats with the same creation time + assertFalse(repositoriesStatsArchive.archive(createRepositoryStats(RepositoryStats.EMPTY_STATS))); + } + + public void testClearArchive() { + int retentionTimeInMillis = randomIntBetween(100, 1000); + AtomicLong fakeRelativeClock = new AtomicLong(); + RepositoriesStatsArchive repositoriesStatsArchive = + new RepositoriesStatsArchive(TimeValue.timeValueMillis(retentionTimeInMillis), + 100, + fakeRelativeClock::get); + + int archivedStatsWithVersionZero = randomIntBetween(1, 20); + for (int i = 0; i < archivedStatsWithVersionZero; i++) { + repositoriesStatsArchive.archive(createRepositoryStats(RepositoryStats.EMPTY_STATS, 0)); + } + + int archivedStatsWithNewerVersion = randomIntBetween(1, 20); + for (int i = 0; i < archivedStatsWithNewerVersion; i++) { + repositoriesStatsArchive.archive(createRepositoryStats(RepositoryStats.EMPTY_STATS, 1)); + } + + List removedStats = repositoriesStatsArchive.clear(0L); + assertThat(removedStats.size(), equalTo(archivedStatsWithVersionZero)); + + assertThat(repositoriesStatsArchive.getArchivedStats().size(), equalTo(archivedStatsWithNewerVersion)); + } + + private RepositoryStatsSnapshot createRepositoryStats(RepositoryStats repositoryStats) { + return createRepositoryStats(repositoryStats, 0); + } + + private RepositoryStatsSnapshot createRepositoryStats(RepositoryStats repositoryStats, long clusterVersion) { + RepositoryInfo repositoryInfo = new RepositoryInfo(UUIDs.randomBase64UUID(), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + org.elasticsearch.common.collect.Map.of("bucket", randomAlphaOfLength(10)), + System.currentTimeMillis(), + null); + return new RepositoryStatsSnapshot(repositoryInfo, repositoryStats, clusterVersion, true); + } + +} diff --git a/test/fixtures/azure-fixture/docker-compose.yml b/test/fixtures/azure-fixture/docker-compose.yml index 61ea9d28a560a..85e073e1803c1 100644 --- a/test/fixtures/azure-fixture/docker-compose.yml +++ b/test/fixtures/azure-fixture/docker-compose.yml @@ -17,3 +17,12 @@ services: - ./testfixtures_shared/shared:/fixture/shared ports: - "8091" + + azure-fixture-repositories-metering: + build: + context: . + dockerfile: Dockerfile + volumes: + - ./testfixtures_shared/shared:/fixture/shared + ports: + - "8091" diff --git a/test/fixtures/gcs-fixture/docker-compose.yml b/test/fixtures/gcs-fixture/docker-compose.yml index a53c4366df6dc..30a362e7caa8d 100644 --- a/test/fixtures/gcs-fixture/docker-compose.yml +++ b/test/fixtures/gcs-fixture/docker-compose.yml @@ -36,3 +36,15 @@ services: - ./testfixtures_shared/shared:/fixture/shared ports: - "80" + gcs-fixture-repositories-metering: + build: + context: . + args: + port: 80 + bucket: "bucket" + token: "o/oauth2/token" + dockerfile: Dockerfile + volumes: + - ./testfixtures_shared/shared:/fixture/shared + ports: + - "80" diff --git a/test/fixtures/s3-fixture/docker-compose.yml b/test/fixtures/s3-fixture/docker-compose.yml index 1d06334eddbd3..22d101f41c318 100644 --- a/test/fixtures/s3-fixture/docker-compose.yml +++ b/test/fixtures/s3-fixture/docker-compose.yml @@ -30,6 +30,21 @@ services: ports: - "80" + s3-fixture-repositories-metering: + build: + context: . + args: + fixtureClass: fixture.s3.S3HttpFixture + port: 80 + bucket: "bucket" + basePath: "base_path" + accessKey: "access_key" + dockerfile: Dockerfile + volumes: + - ./testfixtures_shared/shared:/fixture/shared + ports: + - "80" + s3-fixture-with-session-token: build: context: . diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java index 3f86d3a49a7e6..73f5cb889ddd4 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java @@ -49,6 +49,7 @@ import java.io.InputStream; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -135,8 +136,6 @@ public void tearDownHttpServer() { protected abstract HttpHandler createErroneousHttpHandler(HttpHandler delegate); - protected abstract List requestTypesTracked(); - /** * Test the snapshot and restore of an index which has large segments files. */ @@ -217,32 +216,25 @@ public void testRequestStats() throws Exception { Map sdkRequestCounts = repositoryStats.requestCounts; - for (String requestType : requestTypesTracked()) { - assertSDKCallsMatchMockCalls(sdkRequestCounts, requestType); - } - } + final Map mockCalls = getMockRequestCounts(); - private void assertSDKCallsMatchMockCalls(Map sdkRequestCount, String requestTye) { - final long sdkCalls = sdkRequestCount.getOrDefault(requestTye, 0L); - final long mockCalls = handlers.values().stream() - .mapToLong(h -> { - while (h instanceof DelegatingHttpHandler) { - if (h instanceof HttpStatsCollectorHandler) { - return ((HttpStatsCollectorHandler) h).getCount(requestTye); - } - h = ((DelegatingHttpHandler) h).getDelegate(); - } + String assertionErrorMsg = String.format("SDK sent [%s] calls and handler measured [%s] calls", + sdkRequestCounts, + mockCalls); - return 0L; - }).sum(); - - String assertionErrorMsg = String.format("SDK sent %d [%s] calls and handler measured %d [%s] calls", - sdkCalls, - requestTye, - mockCalls, - requestTye); + assertEquals(assertionErrorMsg, mockCalls, sdkRequestCounts); + } - assertEquals(assertionErrorMsg, mockCalls, sdkCalls); + private Map getMockRequestCounts() { + for (HttpHandler h : handlers.values()) { + while (h instanceof DelegatingHttpHandler) { + if (h instanceof HttpStatsCollectorHandler) { + return ((HttpStatsCollectorHandler) h).getOperationsCount(); + } + h = ((DelegatingHttpHandler) h).getDelegate(); + } + } + return Collections.emptyMap(); } protected static String httpServerUrl() { @@ -352,8 +344,8 @@ public HttpHandler getDelegate() { return delegate; } - synchronized long getCount(final String requestType) { - return operationCount.getOrDefault(requestType, 0L); + synchronized Map getOperationsCount() { + return org.elasticsearch.common.collect.Map.copyOf(operationCount); } protected synchronized void trackRequest(final String requestType) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index a253c7e613fa6..2179e40ea7dae 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -23,6 +23,8 @@ import org.apache.http.HttpHost; import org.apache.http.HttpStatus; import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; import org.apache.http.message.BasicHeader; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.ssl.SSLContexts; @@ -31,6 +33,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; +import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions.Builder; @@ -1220,6 +1223,10 @@ protected static Map getAlias(final String index, final String a protected static Map getAsMap(final String endpoint) throws IOException { Response response = client().performRequest(new Request("GET", endpoint)); + return responseAsMap(response); + } + + protected static Map responseAsMap(Response response) throws IOException { XContentType entityContentType = XContentType.fromMediaTypeOrFormat(response.getEntity().getContentType().getValue()); Map responseEntity = XContentHelper.convertToMap(entityContentType.xContent(), response.getEntity().getContent(), false); @@ -1227,6 +1234,52 @@ protected static Map getAsMap(final String endpoint) throws IOEx return responseEntity; } + protected static void registerRepository(String repository, String type, boolean verify, Settings settings) throws IOException { + final Request request = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repository); + request.addParameter("verify", Boolean.toString(verify)); + request.setJsonEntity(Strings.toString(new PutRepositoryRequest(repository).type(type).settings(settings))); + + final Response response = client().performRequest(request); + assertAcked("Failed to create repository [" + repository + "] of type [" + type + "]: " + response, response); + } + + protected static void createSnapshot(String repository, String snapshot, boolean waitForCompletion) throws IOException { + final Request request = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repository + '/' + snapshot); + request.addParameter("wait_for_completion", Boolean.toString(waitForCompletion)); + + final Response response = client().performRequest(request); + assertThat( + "Failed to create snapshot [" + snapshot + "] in repository [" + repository + "]: " + response, + response.getStatusLine().getStatusCode(), + equalTo(RestStatus.OK.getStatus()) + ); + } + + protected static void restoreSnapshot(String repository, String snapshot, boolean waitForCompletion) throws IOException { + final Request request = new Request(HttpPost.METHOD_NAME, "_snapshot/" + repository + '/' + snapshot + "/_restore"); + request.addParameter("wait_for_completion", Boolean.toString(waitForCompletion)); + + final Response response = client().performRequest(request); + assertThat( + "Failed to restore snapshot [" + snapshot + "] from repository [" + repository + "]: " + response, + response.getStatusLine().getStatusCode(), + equalTo(RestStatus.OK.getStatus()) + ); + } + + @SuppressWarnings("unchecked") + private static void assertAcked(String message, Response response) throws IOException { + final int responseStatusCode = response.getStatusLine().getStatusCode(); + assertThat( + message + ": expecting response code [200] but got [" + responseStatusCode + ']', + responseStatusCode, + equalTo(RestStatus.OK.getStatus()) + ); + final Map responseAsMap = responseAsMap(response); + Boolean acknowledged = (Boolean) XContentMapValues.extractValue(responseAsMap, "acknowledged"); + assertThat(message + ": response is not acknowledged", acknowledged, equalTo(Boolean.TRUE)); + } + /** * Is this template one that is automatically created by xpack? */ diff --git a/x-pack/plugin/repositories-metering-api/build.gradle b/x-pack/plugin/repositories-metering-api/build.gradle new file mode 100644 index 0000000000000..23c911c15c760 --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/build.gradle @@ -0,0 +1,46 @@ +evaluationDependsOn(xpackModule('core')) + +apply plugin: 'elasticsearch.esplugin' +esplugin { + name 'repositories-metering-api' + description 'Repositories metering API' + classname 'org.elasticsearch.xpack.repositories.metering.RepositoriesMeteringPlugin' + extendedPlugins = ['x-pack-core'] +} +archivesBaseName = 'x-pack-repositories-metering-api' + +dependencies { + compileOnly project(path: xpackModule('core'), configuration: 'default') + testImplementation project(path: xpackModule('core'), configuration: 'testArtifacts') +} + +// xpack modules are installed in real clusters as the meta plugin, so +// installing them as individual plugins for integ tests doesn't make sense, +// so we disable integ tests +integTest.enabled = false + +// add all sub-projects of the qa sub-project +gradle.projectsEvaluated { + project.subprojects + .find { it.path == project.path + ":qa" } + .subprojects + .findAll { it.path.startsWith(project.path + ":qa") } + .each { check.dependsOn it.check } +} + +configurations { + testArtifacts.extendsFrom testRuntime + testArtifacts.extendsFrom testImplementation +} + +task testJar(type: Jar) { + appendix 'test' + from sourceSets.test.output +} + +artifacts { + testArtifacts testJar +} + +test { +} diff --git a/x-pack/plugin/repositories-metering-api/qa/azure/build.gradle b/x-pack/plugin/repositories-metering-api/qa/azure/build.gradle new file mode 100644 index 0000000000000..560d6fad19364 --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/qa/azure/build.gradle @@ -0,0 +1,99 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.elasticsearch.gradle.info.BuildParams +import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE + +apply plugin: 'elasticsearch.standalone-rest-test' +apply plugin: 'elasticsearch.rest-test' +apply plugin: 'elasticsearch.rest-resources' + +final Project fixture = project(':test:fixtures:azure-fixture') +final Project repositoryPlugin = project(':plugins:repository-azure') + +dependencies { + testImplementation project(path: xpackModule('repositories-metering-api'), configuration: 'testArtifacts') + testImplementation repositoryPlugin +} + +restResources { + restApi { + includeCore 'indices', 'bulk', 'snapshot', 'nodes', '_common' + includeXpack 'repositories-metering-api' + } +} + +boolean useFixture = false +String azureAccount = System.getenv("azure_storage_account") +String azureKey = System.getenv("azure_storage_key") +String azureContainer = System.getenv("azure_storage_container") +String azureBasePath = System.getenv("azure_storage_base_path") +String azureSasToken = System.getenv("azure_storage_sas_token") + +if (!azureAccount && !azureKey && !azureContainer && !azureBasePath && !azureSasToken) { + azureAccount = 'azure_integration_test_account' + azureKey = 'YXp1cmVfaW50ZWdyYXRpb25fdGVzdF9rZXk=' // The key is "azure_integration_test_key" encoded using base64 + azureContainer = 'container' + azureBasePath = '' + azureSasToken = '' + useFixture = true + +} + +if (useFixture) { + apply plugin: 'elasticsearch.test.fixtures' + testFixtures.useFixture(fixture.path, 'azure-fixture-repositories-metering') +} + +integTest { + dependsOn repositoryPlugin.bundlePlugin + systemProperty 'test.azure.container', azureContainer + nonInputProperties.systemProperty 'test.azure.base_path', azureBasePath + "_repositories_metering_tests_" + BuildParams.testSeed +} + +testClusters.integTest { + testDistribution = 'DEFAULT' + plugin repositoryPlugin.bundlePlugin.archiveFile + + keystore 'azure.client.repositories_metering.account', azureAccount + if (azureKey != null && azureKey.isEmpty() == false) { + keystore 'azure.client.repositories_metering.key', azureKey + } + if (azureSasToken != null && azureSasToken.isEmpty() == false) { + keystore 'azure.client.repositories_metering.sas_token', azureSasToken + } + + if (useFixture) { + def fixtureAddress = { fixtureName -> + assert useFixture: 'closure should not be used without a fixture' + int ephemeralPort = fixture.postProcessFixture.ext."test.fixtures.${fixtureName}.tcp.8091" + assert ephemeralPort > 0 + '127.0.0.1:' + ephemeralPort + } + setting 'azure.client.repositories_metering.endpoint_suffix', + { "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=http://${-> fixtureAddress('azure-fixture-repositories-metering')}" }, IGNORE_VALUE + + } else { + println "Using an external service to test " + project.name + } +} + +task azureThirdPartyTest { + dependsOn integTest +} + diff --git a/x-pack/plugin/repositories-metering-api/qa/azure/src/test/java/org/elasticsearch/xpack/repositories/metering/azure/AzureRepositoriesMeteringIT.java b/x-pack/plugin/repositories-metering-api/qa/azure/src/test/java/org/elasticsearch/xpack/repositories/metering/azure/AzureRepositoriesMeteringIT.java new file mode 100644 index 0000000000000..0941c67f7e6dd --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/qa/azure/src/test/java/org/elasticsearch/xpack/repositories/metering/azure/AzureRepositoriesMeteringIT.java @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.repositories.metering.azure; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.xpack.repositories.metering.AbstractRepositoriesMeteringAPIRestTestCase; + +import java.util.List; +import java.util.Map; + +public class AzureRepositoriesMeteringIT extends AbstractRepositoriesMeteringAPIRestTestCase { + + @Override + protected String repositoryType() { + return "azure"; + } + + @Override + protected Map repositoryLocation() { + return org.elasticsearch.common.collect.Map.of( + "container", + getProperty("test.azure.container"), + "base_path", + getProperty("test.azure.base_path") + ); + } + + @Override + protected Settings repositorySettings() { + final String container = getProperty("test.azure.container"); + + final String basePath = getProperty("test.azure.base_path"); + + return Settings.builder().put("client", "repositories_metering").put("container", container).put("base_path", basePath).build(); + } + + @Override + protected Settings updatedRepositorySettings() { + return Settings.builder().put(repositorySettings()).put("azure.client.repositories_metering.max_retries", 5).build(); + } + + @Override + protected List readCounterKeys() { + return org.elasticsearch.common.collect.List.of("GetBlob", "GetBlobProperties", "ListBlobs"); + } + + @Override + protected List writeCounterKeys() { + return org.elasticsearch.common.collect.List.of("PutBlob"); + } +} diff --git a/x-pack/plugin/repositories-metering-api/qa/build.gradle b/x-pack/plugin/repositories-metering-api/qa/build.gradle new file mode 100644 index 0000000000000..53a1915bb2a07 --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/qa/build.gradle @@ -0,0 +1,6 @@ +apply plugin: 'elasticsearch.build' +test.enabled = false + +dependencies { + api project(':test:framework') +} diff --git a/x-pack/plugin/repositories-metering-api/qa/gcs/build.gradle b/x-pack/plugin/repositories-metering-api/qa/gcs/build.gradle new file mode 100644 index 0000000000000..df34fbc0e8312 --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/qa/gcs/build.gradle @@ -0,0 +1,130 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.elasticsearch.gradle.info.BuildParams +import org.elasticsearch.gradle.MavenFilteringHack + +import java.nio.file.Files +import java.security.KeyPair +import java.security.KeyPairGenerator + +import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE + +apply plugin: 'elasticsearch.standalone-rest-test' +apply plugin: 'elasticsearch.rest-test' +apply plugin: 'elasticsearch.rest-resources' + +final Project fixture = project(':test:fixtures:gcs-fixture') +final Project repositoryPlugin = project(':plugins:repository-gcs') + +dependencies { + testImplementation project(path: xpackModule('repositories-metering-api'), configuration: 'testArtifacts') + testImplementation repositoryPlugin +} + +restResources { + restApi { + includeCore 'indices', 'bulk', 'snapshot', 'nodes', '_common' + includeXpack 'repositories-metering-api' + } +} + +boolean useFixture = false + +String gcsServiceAccount = System.getenv("google_storage_service_account") +String gcsBucket = System.getenv("google_storage_bucket") +String gcsBasePath = System.getenv("google_storage_base_path") + +File serviceAccountFile = null +if (!gcsServiceAccount && !gcsBucket && !gcsBasePath) { + serviceAccountFile = new File(project.buildDir, 'generated-resources/service_account_test.json') + gcsBucket = 'bucket' + gcsBasePath = 'integration_test' + useFixture = true +} else if (!gcsServiceAccount || !gcsBucket || !gcsBasePath) { + throw new IllegalArgumentException("not all options specified to run tests against external GCS service are present") +} else { + serviceAccountFile = new File(gcsServiceAccount) +} + +/** A service account file that points to the Google Cloud Storage service emulated by the fixture **/ +task createServiceAccountFile() { + doLast { + KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA") + keyPairGenerator.initialize(1024) + KeyPair keyPair = keyPairGenerator.generateKeyPair() + String encodedKey = Base64.getEncoder().encodeToString(keyPair.private.getEncoded()) + + serviceAccountFile.parentFile.mkdirs() + serviceAccountFile.setText("{\n" + + ' "type": "service_account",\n' + + ' "project_id": "integration_test",\n' + + ' "private_key_id": "' + UUID.randomUUID().toString() + '",\n' + + ' "private_key": "-----BEGIN PRIVATE KEY-----\\n' + encodedKey + '\\n-----END PRIVATE KEY-----\\n",\n' + + ' "client_email": "integration_test@appspot.gserviceaccount.com",\n' + + ' "client_id": "123456789101112130594"\n' + + '}', 'UTF-8') + } +} + +def fixtureAddress = { f -> + assert useFixture: 'closure should not be used without a fixture' + int ephemeralPort = project(':test:fixtures:gcs-fixture').postProcessFixture.ext."test.fixtures.${f}.tcp.80" + assert ephemeralPort > 0 + 'http://127.0.0.1:' + ephemeralPort +} + +Map expansions = [ + 'bucket' : gcsBucket, + 'base_path': gcsBasePath + "_integration_tests" +] + +processTestResources { + inputs.properties(expansions) + MavenFilteringHack.filter(it, expansions) +} + +if (useFixture) { + apply plugin: 'elasticsearch.test.fixtures' + testFixtures.useFixture(fixture.path, 'gcs-fixture-repositories-metering') +} + +integTest { + dependsOn repositoryPlugin.bundlePlugin + systemProperty 'test.gcs.bucket', gcsBucket + nonInputProperties.systemProperty 'test.gcs.base_path', gcsBasePath + "_repositories_metering" + BuildParams.testSeed +} + +testClusters.integTest { + testDistribution = 'DEFAULT' + plugin repositoryPlugin.bundlePlugin.archiveFile + + keystore 'gcs.client.repositories_metering.credentials_file', serviceAccountFile, IGNORE_VALUE + if (useFixture) { + tasks.integTest.dependsOn createServiceAccountFile + /* Use a closure on the string to delay evaluation until tests are executed */ + setting 'gcs.client.repositories_metering.endpoint', { "${-> fixtureAddress('gcs-fixture-repositories-metering')}" }, IGNORE_VALUE + setting 'gcs.client.repositories_metering.token_uri', { "${-> fixtureAddress('gcs-fixture-repositories-metering')}/o/oauth2/token" }, IGNORE_VALUE + } else { + println "Using an external service to test " + project.name + } +} + +task gcsThirdPartyTest { + dependsOn integTest +} diff --git a/x-pack/plugin/repositories-metering-api/qa/gcs/src/test/java/org/elasticsearch/xpack/repositories/metering/gcs/GCSRepositoriesMeteringIT.java b/x-pack/plugin/repositories-metering-api/qa/gcs/src/test/java/org/elasticsearch/xpack/repositories/metering/gcs/GCSRepositoriesMeteringIT.java new file mode 100644 index 0000000000000..76db2afbf0ca6 --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/qa/gcs/src/test/java/org/elasticsearch/xpack/repositories/metering/gcs/GCSRepositoriesMeteringIT.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.repositories.metering.gcs; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.xpack.repositories.metering.AbstractRepositoriesMeteringAPIRestTestCase; + +import java.util.List; +import java.util.Map; + +public class GCSRepositoriesMeteringIT extends AbstractRepositoriesMeteringAPIRestTestCase { + + @Override + protected String repositoryType() { + return "gcs"; + } + + @Override + protected Map repositoryLocation() { + return org.elasticsearch.common.collect.Map.of( + "bucket", + getProperty("test.gcs.bucket"), + "base_path", + getProperty("test.gcs.base_path") + ); + } + + @Override + protected Settings repositorySettings() { + final String bucket = getProperty("test.gcs.bucket"); + final String basePath = getProperty("test.gcs.base_path"); + + return Settings.builder().put("client", "repositories_metering").put("bucket", bucket).put("base_path", basePath).build(); + } + + @Override + protected Settings updatedRepositorySettings() { + return Settings.builder().put(repositorySettings()).put("gcs.client.repositories_metering.application_name", "updated").build(); + } + + @Override + protected List readCounterKeys() { + return org.elasticsearch.common.collect.List.of("GetObject", "ListObjects"); + } + + @Override + protected List writeCounterKeys() { + return org.elasticsearch.common.collect.List.of("InsertObject"); + } +} diff --git a/x-pack/plugin/repositories-metering-api/qa/s3/build.gradle b/x-pack/plugin/repositories-metering-api/qa/s3/build.gradle new file mode 100644 index 0000000000000..63e2d46bb0321 --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/qa/s3/build.gradle @@ -0,0 +1,75 @@ +import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE +import org.elasticsearch.gradle.info.BuildParams + +apply plugin: 'elasticsearch.standalone-rest-test' +apply plugin: 'elasticsearch.rest-test' +apply plugin: 'elasticsearch.rest-resources' + +final Project fixture = project(':test:fixtures:s3-fixture') +final Project repositoryPlugin = project(':plugins:repository-s3') + +dependencies { + testImplementation project(path: xpackModule('repositories-metering-api'), configuration: 'testArtifacts') + testImplementation repositoryPlugin +} + +restResources { + restApi { + includeCore 'indices', 'bulk', 'snapshot', 'nodes', '_common' + includeXpack 'repositories-metering-api' + } +} + +boolean useFixture = false +String s3AccessKey = System.getenv("amazon_s3_access_key") +String s3SecretKey = System.getenv("amazon_s3_secret_key") +String s3Bucket = System.getenv("amazon_s3_bucket") +String s3BasePath = System.getenv("amazon_s3_base_path") + +if (!s3AccessKey && !s3SecretKey && !s3Bucket && !s3BasePath) { + s3AccessKey = 'access_key' + s3SecretKey = 'secret_key' + s3Bucket = 'bucket' + s3BasePath = null + useFixture = true + +} else if (!s3AccessKey || !s3SecretKey || !s3Bucket || !s3BasePath) { + throw new IllegalArgumentException("not all options specified to run against external S3 service are present") +} + +if (useFixture) { + apply plugin: 'elasticsearch.test.fixtures' + testFixtures.useFixture(fixture.path, 's3-fixture-repositories-metering') +} + +integTest { + dependsOn repositoryPlugin.bundlePlugin + systemProperty 'test.s3.bucket', s3Bucket + nonInputProperties.systemProperty 'test.s3.base_path', s3BasePath ? s3BasePath + "_repositories_metering" + BuildParams.testSeed : 'base_path' +} + +testClusters.integTest { + testDistribution = 'DEFAULT' + plugin repositoryPlugin.bundlePlugin.archiveFile + + keystore 's3.client.repositories_metering.access_key', s3AccessKey + keystore 's3.client.repositories_metering.secret_key', s3SecretKey + + if (useFixture) { + def fixtureAddress = { fixtureName -> + assert useFixture: 'closure should not be used without a fixture' + int ephemeralPort = fixture.postProcessFixture.ext."test.fixtures.${fixtureName}.tcp.80" + assert ephemeralPort > 0 + '127.0.0.1:' + ephemeralPort + } + setting 's3.client.repositories_metering.protocol', 'http' + setting 's3.client.repositories_metering.endpoint', { "${-> fixtureAddress('s3-fixture-repositories-metering')}" }, IGNORE_VALUE + + } else { + println "Using an external service to test " + project.name + } +} + +task s3ThirdPartyTest { + dependsOn integTest +} diff --git a/x-pack/plugin/repositories-metering-api/qa/s3/src/test/java/org/elasticsearch/xpack/repositories/metering/s3/S3RepositoriesMeteringIT.java b/x-pack/plugin/repositories-metering-api/qa/s3/src/test/java/org/elasticsearch/xpack/repositories/metering/s3/S3RepositoriesMeteringIT.java new file mode 100644 index 0000000000000..1ac29def7e65f --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/qa/s3/src/test/java/org/elasticsearch/xpack/repositories/metering/s3/S3RepositoriesMeteringIT.java @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.repositories.metering.s3; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.xpack.repositories.metering.AbstractRepositoriesMeteringAPIRestTestCase; + +import java.util.List; +import java.util.Map; + +public class S3RepositoriesMeteringIT extends AbstractRepositoriesMeteringAPIRestTestCase { + + @Override + protected String repositoryType() { + return "s3"; + } + + @Override + protected Map repositoryLocation() { + return org.elasticsearch.common.collect.Map.of( + "bucket", + getProperty("test.s3.bucket"), + "base_path", + getProperty("test.s3.base_path") + ); + } + + @Override + protected Settings repositorySettings() { + final String bucket = getProperty("test.s3.bucket"); + final String basePath = getProperty("test.s3.base_path"); + + return Settings.builder().put("client", "repositories_metering").put("bucket", bucket).put("base_path", basePath).build(); + } + + @Override + protected Settings updatedRepositorySettings() { + Settings settings = repositorySettings(); + return Settings.builder().put(settings).put("s3.client.max_retries", 4).build(); + } + + @Override + protected List readCounterKeys() { + return org.elasticsearch.common.collect.List.of("GetObject", "ListObjects"); + } + + @Override + protected List writeCounterKeys() { + return org.elasticsearch.common.collect.List.of("PutObject"); + } +} diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/RepositoriesMeteringPlugin.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/RepositoriesMeteringPlugin.java new file mode 100644 index 0000000000000..794e6655397da --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/RepositoriesMeteringPlugin.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.repositories.metering; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.xpack.repositories.metering.action.ClearRepositoriesMeteringArchiveAction; +import org.elasticsearch.xpack.repositories.metering.action.RepositoriesMeteringAction; +import org.elasticsearch.xpack.repositories.metering.action.TransportClearRepositoriesStatsArchiveAction; +import org.elasticsearch.xpack.repositories.metering.action.TransportRepositoriesStatsAction; +import org.elasticsearch.xpack.repositories.metering.rest.RestClearRepositoriesMeteringArchiveAction; +import org.elasticsearch.xpack.repositories.metering.rest.RestGetRepositoriesMeteringAction; + +import java.util.List; +import java.util.function.Supplier; + +public final class RepositoriesMeteringPlugin extends Plugin implements ActionPlugin { + + @Override + public List> getActions() { + return org.elasticsearch.common.collect.List.of( + new ActionHandler<>(RepositoriesMeteringAction.INSTANCE, TransportRepositoriesStatsAction.class), + new ActionHandler<>(ClearRepositoriesMeteringArchiveAction.INSTANCE, TransportClearRepositoriesStatsArchiveAction.class) + ); + } + + @Override + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + return org.elasticsearch.common.collect.List.of( + new RestGetRepositoriesMeteringAction(), + new RestClearRepositoriesMeteringArchiveAction() + ); + } +} diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveAction.java new file mode 100644 index 0000000000000..4ddddc198e454 --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveAction.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.repositories.metering.action; + +import org.elasticsearch.action.ActionType; + +public final class ClearRepositoriesMeteringArchiveAction extends ActionType { + public static final ClearRepositoriesMeteringArchiveAction INSTANCE = new ClearRepositoriesMeteringArchiveAction(); + + static final String NAME = "cluster:monitor/xpack/repositories_metering/clear_metering_archive"; + + ClearRepositoriesMeteringArchiveAction() { + super(NAME, RepositoriesMeteringResponse::new); + } +} diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveRequest.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveRequest.java new file mode 100644 index 0000000000000..98f0a8ec0a6c2 --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveRequest.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.repositories.metering.action; + +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public final class ClearRepositoriesMeteringArchiveRequest extends BaseNodesRequest { + private final long maxVersionToClear; + + public ClearRepositoriesMeteringArchiveRequest(StreamInput in) throws IOException { + super(in); + this.maxVersionToClear = in.readLong(); + } + + public ClearRepositoriesMeteringArchiveRequest(long maxVersionToClear, String... nodesIds) { + super(nodesIds); + this.maxVersionToClear = maxVersionToClear; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(maxVersionToClear); + } + + public long getMaxVersionToClear() { + return maxVersionToClear; + } +} diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringAction.java new file mode 100644 index 0000000000000..9d7cd8bbe7107 --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringAction.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.repositories.metering.action; + +import org.elasticsearch.action.ActionType; + +public final class RepositoriesMeteringAction extends ActionType { + public static final RepositoriesMeteringAction INSTANCE = new RepositoriesMeteringAction(); + + static final String NAME = "cluster:monitor/xpack/repositories_metering/get_metrics"; + + RepositoriesMeteringAction() { + super(NAME, RepositoriesMeteringResponse::new); + } +} diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringRequest.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringRequest.java new file mode 100644 index 0000000000000..92119da4f0027 --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringRequest.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.repositories.metering.action; + +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +public final class RepositoriesMeteringRequest extends BaseNodesRequest { + public RepositoriesMeteringRequest(StreamInput in) throws IOException { + super(in); + } + + public RepositoriesMeteringRequest(String... nodesIds) { + super(nodesIds); + } +} diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponse.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponse.java new file mode 100644 index 0000000000000..382066c06b695 --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponse.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.repositories.metering.action; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +public final class RepositoriesMeteringResponse extends BaseNodesResponse implements ToXContentFragment { + + public RepositoriesMeteringResponse(StreamInput in) throws IOException { + super(in); + } + + public RepositoriesMeteringResponse( + ClusterName clusterName, + List nodes, + List failures + ) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(RepositoriesNodeMeteringResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("nodes"); + for (RepositoriesNodeMeteringResponse nodeStats : getNodes()) { + nodeStats.toXContent(builder, params); + } + builder.endObject(); + return builder; + } +} diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesNodeMeteringResponse.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesNodeMeteringResponse.java new file mode 100644 index 0000000000000..cf463e6f9b22a --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesNodeMeteringResponse.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.repositories.metering.action; + +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.repositories.RepositoryStatsSnapshot; + +import java.io.IOException; +import java.util.List; + +public final class RepositoriesNodeMeteringResponse extends BaseNodeResponse implements ToXContentFragment { + + final List repositoryStatsSnapshots; + + public RepositoriesNodeMeteringResponse(DiscoveryNode node, List repositoryStatsSnapshots) { + super(node); + this.repositoryStatsSnapshots = repositoryStatsSnapshots; + } + + public RepositoriesNodeMeteringResponse(StreamInput in) throws IOException { + super(in); + this.repositoryStatsSnapshots = in.readList(RepositoryStatsSnapshot::new); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startArray(getNode().getId()); + for (RepositoryStatsSnapshot repositoryStats : repositoryStatsSnapshots) { + repositoryStats.toXContent(builder, params); + } + builder.endArray(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(repositoryStatsSnapshots); + } +} diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportClearRepositoriesStatsArchiveAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportClearRepositoriesStatsArchiveAction.java new file mode 100644 index 0000000000000..9116edc2bf84c --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportClearRepositoriesStatsArchiveAction.java @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.repositories.metering.action; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.BaseNodeRequest; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.RepositoryStatsSnapshot; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +public final class TransportClearRepositoriesStatsArchiveAction extends TransportNodesAction< + ClearRepositoriesMeteringArchiveRequest, + RepositoriesMeteringResponse, + TransportClearRepositoriesStatsArchiveAction.ClearRepositoriesStatsArchiveNodeRequest, + RepositoriesNodeMeteringResponse> { + + private final RepositoriesService repositoriesService; + + @Inject + public TransportClearRepositoriesStatsArchiveAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + RepositoriesService repositoriesService + ) { + super( + ClearRepositoriesMeteringArchiveAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + ClearRepositoriesMeteringArchiveRequest::new, + ClearRepositoriesStatsArchiveNodeRequest::new, + ThreadPool.Names.SAME, + RepositoriesNodeMeteringResponse.class + ); + this.repositoriesService = repositoriesService; + } + + @Override + protected RepositoriesMeteringResponse newResponse( + ClearRepositoriesMeteringArchiveRequest request, + List nodesResponses, + List failures + ) { + return new RepositoriesMeteringResponse(clusterService.getClusterName(), nodesResponses, failures); + } + + @Override + protected ClearRepositoriesStatsArchiveNodeRequest newNodeRequest(ClearRepositoriesMeteringArchiveRequest request) { + return new ClearRepositoriesStatsArchiveNodeRequest(request.getMaxVersionToClear()); + } + + @Override + protected RepositoriesNodeMeteringResponse newNodeResponse(StreamInput in) throws IOException { + return new RepositoriesNodeMeteringResponse(in); + } + + @Override + protected RepositoriesNodeMeteringResponse nodeOperation(ClearRepositoriesStatsArchiveNodeRequest request) { + List clearedStats = repositoriesService.clearRepositoriesStatsArchive(request.maxVersionToClear); + return new RepositoriesNodeMeteringResponse(clusterService.localNode(), clearedStats); + } + + static final class ClearRepositoriesStatsArchiveNodeRequest extends BaseNodeRequest { + private final long maxVersionToClear; + + ClearRepositoriesStatsArchiveNodeRequest(long maxVersionToClear) { + this.maxVersionToClear = maxVersionToClear; + } + + ClearRepositoriesStatsArchiveNodeRequest(StreamInput in) throws IOException { + super(in); + this.maxVersionToClear = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(maxVersionToClear); + } + } +} diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportRepositoriesStatsAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportRepositoriesStatsAction.java new file mode 100644 index 0000000000000..2bf8adb0dea97 --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportRepositoriesStatsAction.java @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.repositories.metering.action; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.BaseNodeRequest; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +public final class TransportRepositoriesStatsAction extends TransportNodesAction< + RepositoriesMeteringRequest, + RepositoriesMeteringResponse, + TransportRepositoriesStatsAction.RepositoriesNodeStatsRequest, + RepositoriesNodeMeteringResponse> { + + private final RepositoriesService repositoriesService; + + @Inject + public TransportRepositoriesStatsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + RepositoriesService repositoriesService + ) { + super( + RepositoriesMeteringAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + RepositoriesMeteringRequest::new, + RepositoriesNodeStatsRequest::new, + ThreadPool.Names.SAME, + RepositoriesNodeMeteringResponse.class + ); + this.repositoriesService = repositoriesService; + } + + @Override + protected RepositoriesMeteringResponse newResponse( + RepositoriesMeteringRequest request, + List repositoriesNodeStatsResponses, + List failures + ) { + return new RepositoriesMeteringResponse(clusterService.getClusterName(), repositoriesNodeStatsResponses, failures); + } + + @Override + protected RepositoriesNodeStatsRequest newNodeRequest(RepositoriesMeteringRequest request) { + return new RepositoriesNodeStatsRequest(); + } + + @Override + protected RepositoriesNodeMeteringResponse newNodeResponse(StreamInput in) throws IOException { + return new RepositoriesNodeMeteringResponse(in); + } + + @Override + protected RepositoriesNodeMeteringResponse nodeOperation(RepositoriesNodeStatsRequest request) { + return new RepositoriesNodeMeteringResponse(clusterService.localNode(), repositoriesService.repositoriesStats()); + } + + static final class RepositoriesNodeStatsRequest extends BaseNodeRequest { + RepositoriesNodeStatsRequest() {} + + RepositoriesNodeStatsRequest(StreamInput in) throws IOException { + super(in); + } + } +} diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/rest/RestClearRepositoriesMeteringArchiveAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/rest/RestClearRepositoriesMeteringArchiveAction.java new file mode 100644 index 0000000000000..2177092e3b84e --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/rest/RestClearRepositoriesMeteringArchiveAction.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.repositories.metering.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestActions; +import org.elasticsearch.xpack.repositories.metering.action.ClearRepositoriesMeteringArchiveAction; +import org.elasticsearch.xpack.repositories.metering.action.ClearRepositoriesMeteringArchiveRequest; + +import java.util.List; + +public class RestClearRepositoriesMeteringArchiveAction extends BaseRestHandler { + @Override + public String getName() { + return "clear_repositories_metrics_archive_action"; + } + + @Override + public List routes() { + return org.elasticsearch.common.collect.List.of( + new Route(RestRequest.Method.DELETE, "/_nodes/{nodeId}/_repositories_metering/{maxVersionToClear}") + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); + long maxVersionToClear = request.paramAsLong("maxVersionToClear", -1); + ClearRepositoriesMeteringArchiveRequest clearArchivesRequest = new ClearRepositoriesMeteringArchiveRequest( + maxVersionToClear, + nodesIds + ); + return channel -> client.execute( + ClearRepositoriesMeteringArchiveAction.INSTANCE, + clearArchivesRequest, + new RestActions.NodesResponseRestListener<>(channel) + ); + } +} diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/rest/RestGetRepositoriesMeteringAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/rest/RestGetRepositoriesMeteringAction.java new file mode 100644 index 0000000000000..197171cf582ee --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/rest/RestGetRepositoriesMeteringAction.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.repositories.metering.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestActions; +import org.elasticsearch.xpack.repositories.metering.action.RepositoriesMeteringRequest; +import org.elasticsearch.xpack.repositories.metering.action.RepositoriesMeteringAction; + +import java.util.List; + +public final class RestGetRepositoriesMeteringAction extends BaseRestHandler { + + @Override + public String getName() { + return "get_repositories_metering_action"; + } + + @Override + public List routes() { + return org.elasticsearch.common.collect.List.of(new Route(RestRequest.Method.GET, "/_nodes/{nodeId}/_repositories_metering")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); + RepositoriesMeteringRequest repositoriesMeteringRequest = new RepositoriesMeteringRequest(nodesIds); + return channel -> client.execute( + RepositoriesMeteringAction.INSTANCE, + repositoriesMeteringRequest, + new RestActions.NodesResponseRestListener<>(channel) + ); + } +} diff --git a/x-pack/plugin/repositories-metering-api/src/test/java/org/elasticsearch/xpack/repositories/metering/AbstractRepositoriesMeteringAPIRestTestCase.java b/x-pack/plugin/repositories-metering-api/src/test/java/org/elasticsearch/xpack/repositories/metering/AbstractRepositoriesMeteringAPIRestTestCase.java new file mode 100644 index 0000000000000..7455325c7c2ec --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/src/test/java/org/elasticsearch/xpack/repositories/metering/AbstractRepositoriesMeteringAPIRestTestCase.java @@ -0,0 +1,374 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.repositories.metering; + +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpPost; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.CheckedBiConsumer; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.repositories.RepositoryInfo; +import org.elasticsearch.repositories.RepositoryStats; +import org.elasticsearch.repositories.RepositoryStatsSnapshot; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.blankOrNullString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; + +public abstract class AbstractRepositoriesMeteringAPIRestTestCase extends ESRestTestCase { + protected abstract String repositoryType(); + + protected abstract Map repositoryLocation(); + + protected abstract Settings repositorySettings(); + + /** + * New settings to force a new repository creation + */ + protected abstract Settings updatedRepositorySettings(); + + protected abstract List readCounterKeys(); + + protected abstract List writeCounterKeys(); + + @Before + public void clearArchive() throws Exception { + clearRepositoriesStats(Long.MAX_VALUE); + } + + public void testStatsAreTracked() throws Exception { + snapshotAndRestoreIndex((repository, index) -> { + List repoStats = getRepositoriesStats(); + assertThat(repoStats.size(), equalTo(1)); + + RepositoryStatsSnapshot repositoryStats = repoStats.get(0); + assertRepositoryStatsBelongToRepository(repositoryStats, repository); + assertRequestCountersAccountedForReads(repositoryStats); + assertRequestCountersAccountedForWrites(repositoryStats); + }); + } + + public void testStatsAreUpdatedAfterRepositoryOperations() throws Exception { + String snapshot = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + snapshotAndRestoreIndex(snapshot, (repository, index) -> { + List repoStatsBeforeRestore = getRepositoriesStats(); + assertThat(repoStatsBeforeRestore.size(), equalTo(1)); + + RepositoryStatsSnapshot repositoryStatsBeforeRestore = repoStatsBeforeRestore.get(0); + Map requestCountsBeforeRestore = repositoryStatsBeforeRestore.getRepositoryStats().requestCounts; + assertRepositoryStatsBelongToRepository(repositoryStatsBeforeRestore, repository); + assertRequestCountersAccountedForReads(repositoryStatsBeforeRestore); + assertRequestCountersAccountedForWrites(repositoryStatsBeforeRestore); + + deleteIndex(index); + + restoreSnapshot(repository, snapshot, true); + + List updatedRepoStats = getRepositoriesStats(); + assertThat(updatedRepoStats.size(), equalTo(1)); + RepositoryStatsSnapshot repoStatsAfterRestore = updatedRepoStats.get(0); + Map requestCountsAfterRestore = repoStatsAfterRestore.getRepositoryStats().requestCounts; + + for (String readCounterKey : readCounterKeys()) { + assertThat( + requestCountsAfterRestore.get(readCounterKey), + greaterThanOrEqualTo(requestCountsBeforeRestore.get(readCounterKey)) + ); + } + }); + } + + public void testClearRepositoriesStats() throws Exception { + snapshotAndRestoreIndex((repository, index) -> { + deleteRepository(repository); + + List repositoriesStatsBeforeClearing = getRepositoriesStats(); + assertThat(repositoriesStatsBeforeClearing.size(), equalTo(1)); + RepositoryStatsSnapshot repositoryStatsSnapshot = repositoriesStatsBeforeClearing.get(0); + + assertThat(clearRepositoriesStats(-1).size(), equalTo(0)); + + List removedRepositoriesStats = clearRepositoriesStats(repositoryStatsSnapshot.getClusterVersion()); + + assertThat(repositoriesStatsBeforeClearing, equalTo(removedRepositoriesStats)); + + assertThat(getRepositoriesStats().size(), equalTo(0)); + }); + } + + public void testRegisterMultipleRepositoriesAndGetStats() throws Exception { + List repositoryNames = org.elasticsearch.common.collect.List.of("repo-a", "repo-b", "repo-c"); + for (String repositoryName : repositoryNames) { + registerRepository(repositoryName, repositoryType(), false, repositorySettings()); + } + + List repositoriesStats = getRepositoriesStats(); + Map> repositoryStatsByName = repositoriesStats.stream() + .collect(Collectors.groupingBy(r -> r.getRepositoryInfo().name)); + + for (String repositoryName : repositoryNames) { + List repositoryStats = repositoryStatsByName.get(repositoryName); + assertThat(repositoryStats, is(notNullValue())); + assertThat(repositoryStats.size(), equalTo(1)); + + RepositoryStatsSnapshot stats = repositoryStats.get(0); + assertRepositoryStatsBelongToRepository(stats, repositoryName); + assertAllRequestCountsAreZero(stats); + } + } + + public void testStatsAreArchivedAfterRepositoryDeletion() throws Exception { + snapshotAndRestoreIndex((repository, index) -> { + List repositoriesStats = getRepositoriesStats(); + assertThat(repositoriesStats.size(), equalTo(1)); + RepositoryStatsSnapshot statsBeforeRepoDeletion = repositoriesStats.get(0); + assertRepositoryStatsBelongToRepository(statsBeforeRepoDeletion, repository); + + deleteRepository(repository); + + List repoStatsAfterDeletion = getRepositoriesStats(); + assertThat(repoStatsAfterDeletion.size(), equalTo(1)); + RepositoryStatsSnapshot statsAfterRepoDeletion = repoStatsAfterDeletion.get(0); + assertStatsAreEqualsIgnoringStoppedAt(statsBeforeRepoDeletion, statsAfterRepoDeletion); + }); + } + + public void testStatsAreStoredIntoANewCounterInstanceAfterRepoConfigUpdate() throws Exception { + final String snapshot = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + snapshotAndRestoreIndex(snapshot, (repository, index) -> { + List repositoriesStatsBeforeUpdate = getRepositoriesStats(); + assertThat(repositoriesStatsBeforeUpdate.size(), equalTo(1)); + assertRepositoryStatsBelongToRepository(repositoriesStatsBeforeUpdate.get(0), repository); + assertRequestCountersAccountedForReads(repositoriesStatsBeforeUpdate.get(0)); + assertRequestCountersAccountedForWrites(repositoriesStatsBeforeUpdate.get(0)); + + // Update repository + registerRepository(repository, repositoryType(), false, updatedRepositorySettings()); + + List repositoriesStatsAfterUpdate = getRepositoriesStats(); + + assertThat(repositoriesStatsAfterUpdate.size(), equalTo(2)); + assertStatsAreEqualsIgnoringStoppedAt(repositoriesStatsBeforeUpdate.get(0), repositoriesStatsAfterUpdate.get(0)); + + // The counters for the new repository instance are zero + assertAllRequestCountsAreZero(repositoriesStatsAfterUpdate.get(1)); + + deleteIndex(index); + + restoreSnapshot(repository, snapshot, true); + + List repoStatsAfterRestore = getRepositoriesStats(); + + assertThat(repoStatsAfterRestore.size(), equalTo(2)); + assertStatsAreEqualsIgnoringStoppedAt(repositoriesStatsAfterUpdate.get(0), repoStatsAfterRestore.get(0)); + + assertRequestCountersAccountedForReads(repoStatsAfterRestore.get(1)); + }); + } + + public void testDeleteThenAddRepositoryWithTheSameName() throws Exception { + snapshotAndRestoreIndex((repository, index) -> { + List repoStatsBeforeDeletion = getRepositoriesStats(); + assertThat(repoStatsBeforeDeletion.size(), equalTo(1)); + + deleteRepository(repository); + + List repoStatsAfterDeletion = getRepositoriesStats(); + assertThat(repoStatsAfterDeletion.size(), equalTo(1)); + assertStatsAreEqualsIgnoringStoppedAt(repoStatsBeforeDeletion.get(0), repoStatsAfterDeletion.get(0)); + + registerRepository(repository, repositoryType(), false, repositorySettings()); + + List repositoriesStatsAfterRegisteringTheSameRepo = getRepositoriesStats(); + assertThat(repositoriesStatsAfterRegisteringTheSameRepo.size(), equalTo(2)); + assertStatsAreEqualsIgnoringStoppedAt(repoStatsBeforeDeletion.get(0), repositoriesStatsAfterRegisteringTheSameRepo.get(0)); + assertAllRequestCountsAreZero(repositoriesStatsAfterRegisteringTheSameRepo.get(1)); + }); + } + + private void snapshotAndRestoreIndex(CheckedBiConsumer biConsumer) throws Exception { + final String snapshot = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + snapshotAndRestoreIndex(snapshot, biConsumer); + } + + private void snapshotAndRestoreIndex(String snapshot, CheckedBiConsumer biConsumer) throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final String repository = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final int numberOfShards = randomIntBetween(1, 5); + + final String repositoryType = repositoryType(); + final Settings repositorySettings = repositorySettings(); + + registerRepository(repository, repositoryType, true, repositorySettings); + + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + ensureGreen(indexName); + + final int numDocs = randomIntBetween(1, 500); + final StringBuilder bulkBody = new StringBuilder(); + for (int i = 0; i < numDocs; i++) { + bulkBody.append("{\"index\":{\"_id\":\"").append(i).append("\"}}\n"); + bulkBody.append("{\"field\":").append(i).append(",\"text\":\"Document number ").append(i).append("\"}\n"); + } + + final Request documents = new Request(HttpPost.METHOD_NAME, '/' + indexName + "/_bulk"); + documents.addParameter("refresh", Boolean.TRUE.toString()); + documents.setJsonEntity(bulkBody.toString()); + assertOK(client().performRequest(documents)); + + createSnapshot(repository, snapshot, true); + + deleteIndex(indexName); + + restoreSnapshot(repository, snapshot, true); + + biConsumer.accept(repository, indexName); + } + + private void assertRequestCountersAccountedForReads(RepositoryStatsSnapshot statsSnapshot) { + RepositoryStats repositoryStats = statsSnapshot.getRepositoryStats(); + Map requestCounts = repositoryStats.requestCounts; + for (String readCounterKey : readCounterKeys()) { + assertThat(requestCounts.get(readCounterKey), is(notNullValue())); + assertThat(requestCounts.get(readCounterKey), is(greaterThan(0L))); + } + } + + private void assertRequestCountersAccountedForWrites(RepositoryStatsSnapshot statsSnapshot) { + RepositoryStats repositoryStats = statsSnapshot.getRepositoryStats(); + Map requestCounts = repositoryStats.requestCounts; + for (String writeCounterKey : writeCounterKeys()) { + assertThat(requestCounts.get(writeCounterKey), is(notNullValue())); + assertThat(requestCounts.get(writeCounterKey), is(greaterThan(0L))); + } + } + + private void assertStatsAreEqualsIgnoringStoppedAt(RepositoryStatsSnapshot stats, RepositoryStatsSnapshot otherStats) { + assertRepositoryInfoIsEqualIgnoringStoppedAt(stats.getRepositoryInfo(), otherStats.getRepositoryInfo()); + assertThat(stats.getRepositoryStats(), equalTo(otherStats.getRepositoryStats())); + } + + private void assertRepositoryInfoIsEqualIgnoringStoppedAt(RepositoryInfo repositoryInfo, RepositoryInfo otherRepositoryInfo) { + assertThat(repositoryInfo.ephemeralId, equalTo(otherRepositoryInfo.ephemeralId)); + assertThat(repositoryInfo.name, equalTo(otherRepositoryInfo.name)); + assertThat(repositoryInfo.type, equalTo(otherRepositoryInfo.type)); + assertThat(repositoryInfo.location, equalTo(otherRepositoryInfo.location)); + assertThat(repositoryInfo.startedAt, equalTo(otherRepositoryInfo.startedAt)); + } + + private void assertRepositoryStatsBelongToRepository(RepositoryStatsSnapshot stats, String repositoryName) { + RepositoryInfo repositoryInfo = stats.getRepositoryInfo(); + assertThat(repositoryInfo.name, equalTo(repositoryName)); + assertThat(repositoryInfo.type, equalTo(repositoryType())); + assertThat(repositoryInfo.location, equalTo(repositoryLocation())); + } + + private void assertAllRequestCountsAreZero(RepositoryStatsSnapshot statsSnapshot) { + RepositoryStats stats = statsSnapshot.getRepositoryStats(); + for (long requestCount : stats.requestCounts.values()) { + assertThat(requestCount, equalTo(0)); + } + } + + private List getRepositoriesStats() throws IOException { + Map response = getAsMap("/_nodes/_all/_repositories_metering"); + return parseRepositoriesStatsResponse(response); + } + + private List parseRepositoriesStatsResponse(Map response) throws IOException { + Map>> nodesRepoStats = extractValue(response, "nodes"); + assertThat(response.size(), greaterThan(0)); + List repositoriesStats = new ArrayList<>(); + for (String nodeId : getNodeIds()) { + List> nodeStats = nodesRepoStats.get(nodeId); + assertThat(nodeStats, is(notNullValue())); + + for (Map nodeStatSnapshot : nodeStats) { + RepositoryInfo repositoryInfo = parseRepositoryInfo(nodeStatSnapshot); + Map intRequestCounters = extractValue(nodeStatSnapshot, "request_counts"); + boolean archived = extractValue(nodeStatSnapshot, "archived"); + int clusterVersion = (int) RepositoryStatsSnapshot.UNKNOWN_CLUSTER_VERSION; + if (archived) { + clusterVersion = extractValue(nodeStatSnapshot, "cluster_version"); + } + Map requestCounters = intRequestCounters.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().longValue())); + RepositoryStats repositoryStats = new RepositoryStats(requestCounters); + RepositoryStatsSnapshot statsSnapshot = new RepositoryStatsSnapshot( + repositoryInfo, + repositoryStats, + clusterVersion, + archived + ); + repositoriesStats.add(statsSnapshot); + } + } + return repositoriesStats; + } + + private RepositoryInfo parseRepositoryInfo(Map nodeStatSnapshot) { + String id = extractValue(nodeStatSnapshot, "repository_ephemeral_id"); + String name = extractValue(nodeStatSnapshot, "repository_name"); + String type = extractValue(nodeStatSnapshot, "repository_type"); + Map location = extractValue(nodeStatSnapshot, "repository_location"); + Long startedAt = extractValue(nodeStatSnapshot, "repository_started_at"); + Long stoppedAt = extractValue(nodeStatSnapshot, "repository_stopped_at"); + return new RepositoryInfo(id, name, type, location, startedAt, stoppedAt); + } + + private Set getNodeIds() throws IOException { + Map nodes = extractValue(getAsMap("_nodes/"), "nodes"); + return nodes.keySet(); + } + + private List clearRepositoriesStats(long maxVersionToClear) throws IOException { + final Request request = new Request(HttpDelete.METHOD_NAME, "/_nodes/_all/_repositories_metering/" + maxVersionToClear); + final Response response = client().performRequest(request); + assertThat( + "Failed to clear repositories stats: " + response, + response.getStatusLine().getStatusCode(), + equalTo(RestStatus.OK.getStatus()) + ); + return parseRepositoriesStatsResponse(responseAsMap(response)); + } + + @SuppressWarnings("unchecked") + protected static T extractValue(Map map, String path) { + return (T) XContentMapValues.extractValue(path, map); + } + + protected String getProperty(String propertyName) { + final String property = System.getProperty(propertyName); + assertThat(property, not(blankOrNullString())); + return property; + } +} diff --git a/x-pack/plugin/repositories-metering-api/src/test/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponseTests.java b/x-pack/plugin/repositories-metering-api/src/test/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponseTests.java new file mode 100644 index 0000000000000..9f37ab84cb667 --- /dev/null +++ b/x-pack/plugin/repositories-metering-api/src/test/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponseTests.java @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.repositories.metering.action; + +import org.elasticsearch.Version; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.repositories.RepositoryInfo; +import org.elasticsearch.repositories.RepositoryStats; +import org.elasticsearch.repositories.RepositoryStatsSnapshot; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class RepositoriesMeteringResponseTests extends ESTestCase { + public void testSerializationRoundtrip() throws Exception { + final RepositoriesMeteringResponse repositoriesMeteringResponse = createResponse(); + final RepositoriesMeteringResponse deserializedResponse = copyWriteable( + repositoriesMeteringResponse, + writableRegistry(), + RepositoriesMeteringResponse::new, + Version.CURRENT + ); + assertResponsesAreEqual(repositoriesMeteringResponse, deserializedResponse); + } + + private void assertResponsesAreEqual(RepositoriesMeteringResponse response, RepositoriesMeteringResponse otherResponse) { + List nodeResponses = response.getNodes(); + List otherNodeResponses = otherResponse.getNodes(); + assertThat(nodeResponses.size(), equalTo(otherNodeResponses.size())); + for (int i = 0; i < nodeResponses.size(); i++) { + RepositoriesNodeMeteringResponse nodeResponse = nodeResponses.get(i); + RepositoriesNodeMeteringResponse otherNodeResponse = otherNodeResponses.get(i); + assertThat(nodeResponse.repositoryStatsSnapshots, equalTo(otherNodeResponse.repositoryStatsSnapshots)); + } + + List failures = response.failures(); + List otherFailures = otherResponse.failures(); + assertThat(failures.size(), equalTo(otherFailures.size())); + for (int i = 0; i < failures.size(); i++) { + FailedNodeException failure = failures.get(i); + FailedNodeException otherFailure = otherFailures.get(i); + assertThat(failure.nodeId(), equalTo(otherFailure.nodeId())); + assertThat(failure.getMessage(), equalTo(otherFailure.getMessage())); + } + } + + private RepositoriesMeteringResponse createResponse() { + ClusterName clusterName = new ClusterName("test"); + int nodes = randomIntBetween(1, 10); + List nodeResponses = new ArrayList<>(nodes); + for (int nodeId = 0; nodeId < nodes; nodeId++) { + DiscoveryNode node = new DiscoveryNode("nodeId" + nodeId, buildNewFakeTransportAddress(), Version.CURRENT); + int numberOfRepos = randomInt(10); + List nodeRepoStats = new ArrayList<>(numberOfRepos); + + for (int clusterVersion = 0; clusterVersion < numberOfRepos; clusterVersion++) { + String repoId = randomAlphaOfLength(10); + String repoName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + String repoType = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + Map repoLocation = org.elasticsearch.common.collect.Map.of( + "bucket", + randomAlphaOfLength(10).toLowerCase(Locale.ROOT) + ); + long startedAt = System.currentTimeMillis() - 1; + Long stoppedAt = randomBoolean() ? System.currentTimeMillis() : null; + RepositoryInfo repositoryInfo = new RepositoryInfo(repoId, repoName, repoType, repoLocation, startedAt, stoppedAt); + boolean archived = randomBoolean(); + RepositoryStatsSnapshot statsSnapshot = new RepositoryStatsSnapshot( + repositoryInfo, + new RepositoryStats(org.elasticsearch.common.collect.Map.of("GET", randomLongBetween(0, 2000))), + archived ? clusterVersion : RepositoryStatsSnapshot.UNKNOWN_CLUSTER_VERSION, + archived + ); + nodeRepoStats.add(statsSnapshot); + } + + nodeResponses.add(new RepositoriesNodeMeteringResponse(node, nodeRepoStats)); + } + + int numberOfFailures = randomInt(20); + List failures = new ArrayList<>(numberOfFailures); + for (int i = nodes; i < numberOfFailures + nodes; i++) { + FailedNodeException failedNodeException = new FailedNodeException( + "nodeId" + i, + "error", + randomBoolean() ? new RuntimeException("boom") : null + ); + failures.add(failedNodeException); + } + + return new RepositoriesMeteringResponse(clusterName, nodeResponses, failures); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java index 00ac39c0773fe..0dd84a588c00c 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java @@ -8,20 +8,14 @@ import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpPut; -import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.query.QueryBuilder; @@ -31,7 +25,6 @@ import org.elasticsearch.test.rest.ESRestTestCase; import java.io.IOException; -import java.io.InputStream; import java.util.List; import java.util.Locale; import java.util.Map; @@ -39,7 +32,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.notNullValue; public abstract class AbstractSearchableSnapshotsRestTestCase extends ESRestTestCase { @@ -250,26 +242,6 @@ public void assertSearchResults(String indexName, int numDocs, Boolean ignoreThr } } - protected static void registerRepository(String repository, String type, boolean verify, Settings settings) throws IOException { - final Request request = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repository); - request.setJsonEntity(Strings.toString(new PutRepositoryRequest(repository).type(type).verify(verify).settings(settings))); - - final Response response = client().performRequest(request); - assertAcked("Failed to create repository [" + repository + "] of type [" + type + "]: " + response, response); - } - - protected static void createSnapshot(String repository, String snapshot, boolean waitForCompletion) throws IOException { - final Request request = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repository + '/' + snapshot); - request.addParameter("wait_for_completion", Boolean.toString(waitForCompletion)); - - final Response response = client().performRequest(request); - assertThat( - "Failed to create snapshot [" + snapshot + "] in repository [" + repository + "]: " + response, - response.getStatusLine().getStatusCode(), - equalTo(RestStatus.OK.getStatus()) - ); - } - protected static void deleteSnapshot(String repository, String snapshot, boolean ignoreMissing) throws IOException { final Request request = new Request(HttpDelete.METHOD_NAME, "_snapshot/" + repository + '/' + snapshot); try { @@ -406,19 +378,6 @@ protected static Map indexSettings(String index) throws IOExcept return extractValue(responseAsMap(response), index + ".settings"); } - protected static Map responseAsMap(Response response) throws IOException { - final XContentType xContentType = XContentType.fromMediaTypeOrFormat(response.getEntity().getContentType().getValue()); - assertThat("Unknown XContentType", xContentType, notNullValue()); - - BytesReference bytesReference = Streams.readFully(response.getEntity().getContent()); - - try (InputStream responseBody = bytesReference.streamInput()) { - return XContentHelper.convertToMap(xContentType.xContent(), responseBody, true); - } catch (Exception e) { - throw new IOException(bytesReference.utf8ToString(), e); - } - } - @SuppressWarnings("unchecked") protected static T extractValue(Map map, String path) { return (T) XContentMapValues.extractValue(path, map);