Skip to content

Commit

Permalink
Merge branch 'main' into test/99778/mappings-update-race-condition
Browse files Browse the repository at this point in the history
  • Loading branch information
williamrandolph committed Jan 10, 2024
2 parents f28080d + 709c0f5 commit 1913cc8
Show file tree
Hide file tree
Showing 110 changed files with 2,331 additions and 1,009 deletions.
4 changes: 2 additions & 2 deletions .buildkite/pipelines/periodic.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,13 @@ steps:
machineType: n2-standard-8
buildDirectory: /dev/shm/bk
if: build.branch == "main" || build.branch == "7.17"
- label: Check branch consistency
- label: check-branch-consistency
command: .ci/scripts/run-gradle.sh branchConsistency
timeout_in_minutes: 15
agents:
provider: gcp
image: family/elasticsearch-ubuntu-2004
machineType: n2-standard-2
- label: Check branch protection rules
- label: check-branch-protection-rules
command: .buildkite/scripts/branch-protection.sh
timeout_in_minutes: 5
4 changes: 2 additions & 2 deletions .buildkite/pipelines/periodic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1303,13 +1303,13 @@ steps:
machineType: n2-standard-8
buildDirectory: /dev/shm/bk
if: build.branch == "main" || build.branch == "7.17"
- label: Check branch consistency
- label: check-branch-consistency
command: .ci/scripts/run-gradle.sh branchConsistency
timeout_in_minutes: 15
agents:
provider: gcp
image: family/elasticsearch-ubuntu-2004
machineType: n2-standard-2
- label: Check branch protection rules
- label: check-branch-protection-rules
command: .buildkite/scripts/branch-protection.sh
timeout_in_minutes: 5
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ buildScan {
def branch = System.getenv('BUILDKITE_PULL_REQUEST_BASE_BRANCH') ?: System.getenv('BUILDKITE_BRANCH')
def repoMatcher = System.getenv('BUILDKITE_REPO') =~ /(https:\/\/github\.com\/|git@github\.com:)(\S+)\.git/
def repository = repoMatcher.matches() ? repoMatcher.group(2) : "<unknown>"
def jobName = (System.getenv('BUILDKITE_LABEL') ?: '').replaceAll(/[^a-zA-Z0-9_\-]+/, ' ').trim().replaceAll(' ', '_').toLowerCase()
def jobLabel = System.getenv('BUILDKITE_LABEL') ?: ''
def jobName = safeName(jobLabel)

tag 'CI'
link 'CI Build', "${buildKiteUrl}#${System.getenv('BUILDKITE_JOB_ID')}"
Expand All @@ -111,6 +112,11 @@ buildScan {

value 'Job Name', jobName
tag jobName
if (jobLabel.contains("/")) {
jobLabel.split("/").collect {safeName(it) }.each {matrix ->
tag matrix
}
}

if (branch) {
tag branch
Expand Down Expand Up @@ -161,3 +167,7 @@ buildScan {
}
}
}

static def safeName(String string) {
return string.replaceAll(/[^a-zA-Z0-9_\-\.]+/, ' ').trim().replaceAll(' ', '_').toLowerCase()
}
6 changes: 6 additions & 0 deletions docs/changelog/100813.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 100813
summary: Make `ParentTaskAssigningClient.getRemoteClusterClient` method also return
`ParentTaskAssigningClient`
area: Infra/Transport API
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/104043.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 104043
summary: Expose service account authentication metrics
area: Authentication
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/104092.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 104092
summary: Ingest geoip processor cache 'no results' from the database
area: Ingest Node
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/104113.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 104113
summary: "X-pack/plugin/apm-data: fix `@custom` pipeline support"
area: Ingest Node
type: bug
issues: []
2 changes: 1 addition & 1 deletion docs/reference/data-streams/tsds-index-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ value (exclusive) accepted by the index. Only indices with an `index.mode` of
`index.look_ahead_time`::
(<<_static_index_settings,Static>>, <<time-units,time units>>)
Interval used to calculate the `index.time_series.end_time` for a TSDS's write
index. Defaults to `2h` (2 hours). Accepts `1m` (one minute) to `2h` (two
index. Defaults to `30m` (30 minutes). Accepts `1m` (one minute) to `2h` (two
hours). Only indices with an `index.mode` of `time_series` support this setting.
For more information, refer to <<tsds-look-ahead-time>>. Additionally this setting
can not be less than `time_series.poll_interval` cluster setting.
Expand Down
7 changes: 7 additions & 0 deletions docs/reference/search/search-your-data/knn-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -814,12 +814,19 @@ Now we have filtered based on the top level `"creation_time"` and only one docum
----
// TESTRESPONSE[s/"took": 4/"took" : "$body.took"/]

[discrete]
[[nested-knn-search-inner-hits]]
==== Nested kNN Search with Inner hits

Additionally, if you wanted to extract the nearest passage for a matched document, you can supply <<inner-hits, inner_hits>>
to the `knn` clause.

NOTE: `inner_hits` for kNN will only ever return a single hit, the nearest passage vector.
Setting `"size"` to any value greater than `1` will have no effect on the results.

NOTE: When using `inner_hits` and multiple `knn` clauses, be sure to specify the <<inner-hits-options,`inner_hits.name`>>
field. Otherwise, a naming clash can occur and fail the search request.

[source,console]
----
POST passage_vectors/_search
Expand Down
10 changes: 8 additions & 2 deletions docs/reference/troubleshooting/corruption-issues.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ well-tested, so you can be very confident that a checksum mismatch really does
indicate that the data read from disk is different from the data that {es}
previously wrote.

If a file header is corrupted then it's possible that {es} might not be able
to work out how to even start reading the file which can lead to an exception
such as:

- `org.apache.lucene.index.IndexFormatTooOldException`
- `org.apache.lucene.index.IndexFormatTooNewException`

It is also possible that {es} reports a corruption if a file it needs is
entirely missing, with an exception such as:

Expand All @@ -50,8 +57,7 @@ system previously confirmed to {es} that this file was durably synced to disk.
On Linux this means that the `fsync()` system call returned successfully. {es}
sometimes reports that an index is corrupt because a file needed for recovery
is missing, or it exists but has been truncated or is missing its footer. This
indicates that your storage system acknowledges durable writes incorrectly or
that some external process has modified the data {es} previously wrote to disk.
may indicate that your storage system acknowledges durable writes incorrectly.

There are many possible explanations for {es} detecting corruption in your
cluster. Databases like {es} generate a challenging I/O workload that may find
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,26 @@
import java.util.function.Function;

/**
* The in-memory cache for the geoip data. There should only be 1 instance of this class..
* The in-memory cache for the geoip data. There should only be 1 instance of this class.
* This cache differs from the maxmind's {@link NodeCache} such that this cache stores the deserialized Json objects to avoid the
* cost of deserialization for each lookup (cached or not). This comes at slight expense of higher memory usage, but significant
* reduction of CPU usage.
*/
final class GeoIpCache {

/**
* Internal-only sentinel object for recording that a result from the geoip database was null (i.e. there was no result). By caching
* this no-result we can distinguish between something not being in the cache because we haven't searched for that data yet, versus
* something not being in the cache because the data doesn't exist in the database.
*/
// visible for testing
static final AbstractResponse NO_RESULT = new AbstractResponse() {
@Override
public String toString() {
return "AbstractResponse[NO_RESULT]";
}
};

private final Cache<CacheKey, AbstractResponse> cache;

// package private for testing
Expand All @@ -40,18 +54,27 @@ <T extends AbstractResponse> T putIfAbsent(
String databasePath,
Function<InetAddress, AbstractResponse> retrieveFunction
) {

// can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader)
CacheKey cacheKey = new CacheKey(ip, databasePath);
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
AbstractResponse response = cache.get(cacheKey);

// populate the cache for this key, if necessary
if (response == null) {
response = retrieveFunction.apply(ip);
if (response != null) {
cache.put(cacheKey, response);
// if the response from the database was null, then use the no-result sentinel value
if (response == null) {
response = NO_RESULT;
}
// store the result or no-result in the cache
cache.put(cacheKey, response);
}

if (response == NO_RESULT) {
return null; // the no-result sentinel is an internal detail, don't expose it
} else {
return (T) response;
}
return (T) response;
}

// only useful for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,8 @@ private Map<String, Object> getGeoData(GeoIpDatabase geoIpDatabase, String ip) t
geoData = retrieveCityGeoData(geoIpDatabase, ipAddress);
} else if (databaseType.endsWith(COUNTRY_DB_SUFFIX)) {
geoData = retrieveCountryGeoData(geoIpDatabase, ipAddress);

} else if (databaseType.endsWith(ASN_DB_SUFFIX)) {
geoData = retrieveAsnGeoData(geoIpDatabase, ipAddress);

} else {
throw new ElasticsearchParseException(
"Unsupported database type [" + geoIpDatabase.getDatabaseType() + "]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.test.ESTestCase;

import java.net.InetAddress;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static org.mockito.Mockito.mock;

public class GeoIpCacheTests extends ESTestCase {
Expand All @@ -36,6 +40,23 @@ public void testCachesAndEvictsResults() {
assertNotSame(response1, cache.get(InetAddresses.forString("127.0.0.1"), "path/to/db"));
}

public void testCachesNoResult() {
GeoIpCache cache = new GeoIpCache(1);
final AtomicInteger count = new AtomicInteger(0);
Function<InetAddress, AbstractResponse> countAndReturnNull = (ip) -> {
count.incrementAndGet();
return null;
};

AbstractResponse response = cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), "path/to/db", countAndReturnNull);
assertNull(response);
assertNull(cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), "path/to/db", countAndReturnNull));
assertEquals(1, count.get());

// the cached value is not actually *null*, it's the NO_RESULT sentinel
assertSame(GeoIpCache.NO_RESULT, cache.get(InetAddresses.forString("127.0.0.1"), "path/to/db"));
}

public void testCacheKey() {
GeoIpCache cache = new GeoIpCache(2);
AbstractResponse response1 = mock(AbstractResponse.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@ private static SubscribableListener<Void> createSnapshotPausedListener(
) {
return ClusterServiceUtils.addTemporaryStateListener(clusterService, state -> {
final var entriesForRepo = SnapshotsInProgress.get(state).forRepo(repoName);
if (entriesForRepo.isEmpty()) {
// it's (just about) possible for the data node to apply the initial snapshot state, start on the first shard snapshot, and
// hit the IO block, before the master even applies this cluster state, in which case we simply retry:
return false;
}
assertThat(entriesForRepo, hasSize(1));
final var shardSnapshotStatuses = entriesForRepo.iterator()
.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ static TransportVersion def(int id) {
public static final TransportVersion LAZY_ROLLOVER_ADDED = def(8_569_00_0);
public static final TransportVersion ESQL_PLAN_POINT_LITERAL_WKB = def(8_570_00_0);
public static final TransportVersion HOT_THREADS_AS_BYTES = def(8_571_00_0);
public static final TransportVersion ML_INFERENCE_REQUEST_INPUT_TYPE_ADDED = def(8_572_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
38 changes: 35 additions & 3 deletions server/src/main/java/org/elasticsearch/action/ActionType.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,38 @@

package org.elasticsearch.action;

import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.transport.TransportService;

/**
* A generic action. Should strive to make it a singleton.
* An action which can be invoked by {@link Client#execute}. The implementation must be registered with the node using
* {@link ActionModule#setupActions} (for actions in the {@code :server} package) or {@link ActionPlugin#getActions} (for actions in
* plugins).
* <p>
* Typically, every {@link ActionType} instance is a global constant (i.e. a public static final field) called {@code INSTANCE} or {@code
* TYPE}. Some legacy implementations create custom subclasses of {@link ActionType} but this is unnecessary and somewhat wasteful. Prefer
* to create instances of this class directly whenever possible.
*/
public class ActionType<Response extends ActionResponse> {

private final String name;
private final Writeable.Reader<Response> responseReader;

/**
* Construct an {@link ActionType} which callers can execute on the local node (using {@link NodeClient}).
* <p>
* There is no facility for directly executing an action on a different node in the local cluster. To achieve this, implement an action
* which runs on the local node and knows how to use the {@link TransportService} to forward the request to a different node. There are
* several utilities that help implement such an action, including {@link TransportNodesAction} or {@link TransportMasterNodeAction}.
*
* @param name The name of the action, which must be unique across actions.
* @return an {@link ActionType} which callers can execute on the local node.
*/
public static <T extends ActionResponse> ActionType<T> localOnly(String name) {
return new ActionType<>(name, Writeable.Reader.localOnly());
}
Expand All @@ -27,8 +49,18 @@ public static ActionType<ActionResponse.Empty> emptyResponse(String name) {
}

/**
* @param name The name of the action, must be unique across actions.
* @param responseReader A reader for the response type
* Construct an {@link ActionType} which callers can execute both on the local node (using {@link NodeClient}) and on a remote cluster
* (using a client obtained from {@link Client#getRemoteClusterClient}). If the action is only to be executed on the local cluster then
* declare it using {@link #localOnly} instead.
* <p>
* There is no facility for directly executing an action on a different node in the local cluster. To achieve this, implement an action
* which runs on the local node and knows how to use the {@link TransportService} to forward the request to a different node. There are
* several utilities that help implement such an action, including {@link TransportNodesAction} or {@link TransportMasterNodeAction}.
*
* @param name The name of the action, which must be unique across actions. When executed on a remote cluster, this is the
* ID of the transport action which is sent to the handling node in the remote cluster.
* @param responseReader Defines how to deserialize responses received from executions of this action on remote clusters. Executions of
* this action on the local node receive the response object directly, without needing any deserialization.
*/
public ActionType(String name, Writeable.Reader<Response> responseReader) {
this.name = name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.util.concurrent.Executor;

/**
* A {@linkplain Client} that sets the parent task on all requests that it makes. Use this to conveniently implement actions that cause
* many other actions.
Expand Down Expand Up @@ -58,4 +60,10 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
request.setParentTask(parentTask);
super.doExecute(action, request, listener);
}

@Override
public ParentTaskAssigningClient getRemoteClusterClient(String clusterAlias, Executor responseExecutor) {
Client remoteClient = super.getRemoteClusterClient(clusterAlias, responseExecutor);
return new ParentTaskAssigningClient(remoteClient, parentTask);
}
}
40 changes: 40 additions & 0 deletions server/src/main/java/org/elasticsearch/inference/InputType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.inference;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;

import java.io.IOException;
import java.util.Locale;

/**
* Defines the type of request, whether the request is to ingest a document or search for a document.
*/
public enum InputType implements Writeable {
INGEST,
SEARCH;

public static String NAME = "input_type";

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}

public static InputType fromStream(StreamInput in) throws IOException {
return in.readEnum(InputType.class);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(this);
}
}
Loading

0 comments on commit 1913cc8

Please sign in to comment.