From de25e1c3e1b3d42aa95ffd55fbff8caeaadc51e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Mon, 22 Jan 2024 17:05:40 +0100 Subject: [PATCH] [Transform] Fix bug when `latest` transform is used together with `from` parameter (#104606) --- docs/changelog/104606.yaml | 6 + .../integration/TransformLatestRestIT.java | 113 ++++++++++++++++++ .../transforms/TransformIndexer.java | 2 +- 3 files changed, 120 insertions(+), 1 deletion(-) create mode 100644 docs/changelog/104606.yaml diff --git a/docs/changelog/104606.yaml b/docs/changelog/104606.yaml new file mode 100644 index 0000000000000..f419c21e0a17d --- /dev/null +++ b/docs/changelog/104606.yaml @@ -0,0 +1,6 @@ +pr: 104606 +summary: Fix bug when `latest` transform is used together with `from` parameter +area: Transform +type: bug +issues: + - 104543 diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java index 9224b838b058b..af5fe9f180a4e 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java @@ -147,4 +147,117 @@ public void testLatestWithAggregateMetricDoubleAsUniqueKey() throws Exception { ); } } + + public void testContinuousLatestWithFrom_NoDocs() throws Exception { + testContinuousLatestWithFrom("latest_from_no_docs", "reviews_from_no_docs", "2017-02-20", 0); + } + + public void testContinuousLatestWithFrom_OneDoc() throws Exception { + testContinuousLatestWithFrom("latest_from_one_doc", "reviews_from_one_doc", "2017-02-10", 1); + } + + public void testContinuousLatestWithFrom_AllDocs_FromNull() throws Exception { + testContinuousLatestWithFrom("latest_from_all_docs_from_null", "reviews_from_all_docs_from_null", null, 28); + } + + public void testContinuousLatestWithFrom_AllDocs() throws Exception { + testContinuousLatestWithFrom("latest_from_all_docs", "reviews_from_all_docs", "2017-01-01", 28); + } + + private void testContinuousLatestWithFrom(String transformId, String indexName, String from, int expectedDestNumDocs) throws Exception { + createReviewsIndex(indexName); + String transformIndex = transformId + "-dest"; + setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex); + Request createTransformRequest = createRequestWithAuth( + "PUT", + getTransformEndpoint() + transformId, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS + ); + String config = Strings.format(""" + { + "source": { + "index": "%s" + }, + "dest": { + "index": "%s" + }, + "frequency": "1s", + "sync": { + "time": { + "field": "timestamp", + "delay": "1s" + } + }, + "latest": { + "unique_key": [ "user_id" ], + "sort": "timestamp" + } + }""", indexName, transformIndex); + createTransformRequest.setJsonEntity(config); + Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); + assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + assertSourceIndexContents(indexName, 1000, "2017-01-10T10:10:10.000Z", "2017-01-30T22:34:38.000Z"); + + { + StringBuilder bulk = new StringBuilder(); + bulk.append(Strings.format(""" + {"index":{"_index":"%s"}} + {"user_id":"user_%s","business_id":"business_%s","stars":%s,"location":"%s","timestamp":%s} + """, indexName, 666, 777, 7, 888, "\"2017-02-15\"")); + bulk.append("\r\n"); + + Request bulkRequest = new Request("POST", "/_bulk"); + bulkRequest.addParameter("refresh", "true"); + bulkRequest.setJsonEntity(bulk.toString()); + Map bulkResponse = entityAsMap(client().performRequest(bulkRequest)); + assertThat(bulkResponse.get("errors"), equalTo(Boolean.FALSE)); + } + + assertSourceIndexContents(indexName, 1001, "2017-01-10T10:10:10.000Z", "2017-02-15T00:00:00.000Z"); + + startAndWaitForContinuousTransform(transformId, transformIndex, null, from, 1L); + assertTrue(indexExists(transformIndex)); + + Map transformIndexStats = getAsMap(transformIndex + "/_stats"); + assertThat( + "Stats were: " + transformIndexStats, + XContentMapValues.extractValue("_all.total.docs.count", transformIndexStats), + is(equalTo(expectedDestNumDocs)) + ); + + stopTransform(transformId, false); + deleteIndex(indexName); + } + + private void assertSourceIndexContents(String indexName, int expectedNumDocs, String expectedMinTimestamp, String expectedMaxTimestamp) + throws IOException { + Request searchRequest = new Request("GET", indexName + "/_search"); + searchRequest.setJsonEntity(""" + { + "size": 0, + "aggregations": { + "min_timestamp": { + "min": { + "field": "timestamp" + } + }, + "max_timestamp": { + "max": { + "field": "timestamp" + } + } + } + }"""); + Map searchResponse = entityAsMap(client().performRequest(searchRequest)); + assertThat(XContentMapValues.extractValue("hits.total.value", searchResponse), is(equalTo(expectedNumDocs))); + assertThat( + XContentMapValues.extractValue("aggregations.min_timestamp.value_as_string", searchResponse), + is(equalTo(expectedMinTimestamp)) + ); + assertThat( + XContentMapValues.extractValue("aggregations.max_timestamp.value_as_string", searchResponse), + is(equalTo(expectedMaxTimestamp)) + ); + } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 365393ae36fb5..f6d4ae2d53c9a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -1177,7 +1177,7 @@ protected boolean shouldAuditOnFinish(long completedCheckpoint) { } private RunState determineRunStateAtStart() { - if (context.from() != null) { + if (context.from() != null && changeCollector != null && changeCollector.queryForChanges()) { return RunState.IDENTIFY_CHANGES; }