Skip to content

Commit

Permalink
[Transform] Fix bug when latest transform is used together with `fr…
Browse files Browse the repository at this point in the history
…om` parameter (elastic#104606)
  • Loading branch information
przemekwitek authored Jan 22, 2024
1 parent ffcb124 commit de25e1c
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 1 deletion.
6 changes: 6 additions & 0 deletions docs/changelog/104606.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 104606
summary: Fix bug when `latest` transform is used together with `from` parameter
area: Transform
type: bug
issues:
- 104543
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,117 @@ public void testLatestWithAggregateMetricDoubleAsUniqueKey() throws Exception {
);
}
}

public void testContinuousLatestWithFrom_NoDocs() throws Exception {
testContinuousLatestWithFrom("latest_from_no_docs", "reviews_from_no_docs", "2017-02-20", 0);
}

public void testContinuousLatestWithFrom_OneDoc() throws Exception {
testContinuousLatestWithFrom("latest_from_one_doc", "reviews_from_one_doc", "2017-02-10", 1);
}

public void testContinuousLatestWithFrom_AllDocs_FromNull() throws Exception {
testContinuousLatestWithFrom("latest_from_all_docs_from_null", "reviews_from_all_docs_from_null", null, 28);
}

public void testContinuousLatestWithFrom_AllDocs() throws Exception {
testContinuousLatestWithFrom("latest_from_all_docs", "reviews_from_all_docs", "2017-01-01", 28);
}

private void testContinuousLatestWithFrom(String transformId, String indexName, String from, int expectedDestNumDocs) throws Exception {
createReviewsIndex(indexName);
String transformIndex = transformId + "-dest";
setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex);
Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = Strings.format("""
{
"source": {
"index": "%s"
},
"dest": {
"index": "%s"
},
"frequency": "1s",
"sync": {
"time": {
"field": "timestamp",
"delay": "1s"
}
},
"latest": {
"unique_key": [ "user_id" ],
"sort": "timestamp"
}
}""", indexName, transformIndex);
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));

assertSourceIndexContents(indexName, 1000, "2017-01-10T10:10:10.000Z", "2017-01-30T22:34:38.000Z");

{
StringBuilder bulk = new StringBuilder();
bulk.append(Strings.format("""
{"index":{"_index":"%s"}}
{"user_id":"user_%s","business_id":"business_%s","stars":%s,"location":"%s","timestamp":%s}
""", indexName, 666, 777, 7, 888, "\"2017-02-15\""));
bulk.append("\r\n");

Request bulkRequest = new Request("POST", "/_bulk");
bulkRequest.addParameter("refresh", "true");
bulkRequest.setJsonEntity(bulk.toString());
Map<String, Object> bulkResponse = entityAsMap(client().performRequest(bulkRequest));
assertThat(bulkResponse.get("errors"), equalTo(Boolean.FALSE));
}

assertSourceIndexContents(indexName, 1001, "2017-01-10T10:10:10.000Z", "2017-02-15T00:00:00.000Z");

startAndWaitForContinuousTransform(transformId, transformIndex, null, from, 1L);
assertTrue(indexExists(transformIndex));

Map<String, Object> transformIndexStats = getAsMap(transformIndex + "/_stats");
assertThat(
"Stats were: " + transformIndexStats,
XContentMapValues.extractValue("_all.total.docs.count", transformIndexStats),
is(equalTo(expectedDestNumDocs))
);

stopTransform(transformId, false);
deleteIndex(indexName);
}

private void assertSourceIndexContents(String indexName, int expectedNumDocs, String expectedMinTimestamp, String expectedMaxTimestamp)
throws IOException {
Request searchRequest = new Request("GET", indexName + "/_search");
searchRequest.setJsonEntity("""
{
"size": 0,
"aggregations": {
"min_timestamp": {
"min": {
"field": "timestamp"
}
},
"max_timestamp": {
"max": {
"field": "timestamp"
}
}
}
}""");
Map<String, Object> searchResponse = entityAsMap(client().performRequest(searchRequest));
assertThat(XContentMapValues.extractValue("hits.total.value", searchResponse), is(equalTo(expectedNumDocs)));
assertThat(
XContentMapValues.extractValue("aggregations.min_timestamp.value_as_string", searchResponse),
is(equalTo(expectedMinTimestamp))
);
assertThat(
XContentMapValues.extractValue("aggregations.max_timestamp.value_as_string", searchResponse),
is(equalTo(expectedMaxTimestamp))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ protected boolean shouldAuditOnFinish(long completedCheckpoint) {
}

private RunState determineRunStateAtStart() {
if (context.from() != null) {
if (context.from() != null && changeCollector != null && changeCollector.queryForChanges()) {
return RunState.IDENTIFY_CHANGES;
}

Expand Down

0 comments on commit de25e1c

Please sign in to comment.