-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce global checkpoint listeners #32696
Changes from 16 commits
3d175ab
2d31212
ec2f458
06db241
eab2a87
c34a054
a67e6aa
f26ce14
9e5ce52
30a16f6
8ac2ed5
22d13a8
dc5d326
ea3c198
50a9a6c
7e77278
48089fb
5b63507
9533679
88dee76
aad650a
07dab4f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.index.shard; | ||
|
||
import org.apache.logging.log4j.Logger; | ||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.concurrent.Executor; | ||
|
||
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; | ||
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; | ||
|
||
/** | ||
* Represents a collection of global checkpoint listeners. This collection can be added to, and all listeners present at the time of an | ||
* update will be notified together. All listeners will be notified when the shard is closed. | ||
*/ | ||
public class GlobalCheckpointListeners implements Closeable { | ||
|
||
/** | ||
* A global checkpoint listener consisting of a callback that is notified when the global checkpoint is updated or the shard is closed. | ||
*/ | ||
@FunctionalInterface | ||
public interface GlobalCheckpointListener { | ||
/** | ||
* Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint | ||
* will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null. If the | ||
* global checkpoint is updated, the exception will be null. | ||
* | ||
* @param globalCheckpoint the updated global checkpoint | ||
* @param e if non-null, the shard is closed | ||
*/ | ||
void accept(long globalCheckpoint, IndexShardClosedException e); | ||
} | ||
|
||
// guarded by this | ||
private boolean closed; | ||
private volatile List<GlobalCheckpointListener> listeners; | ||
private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO; | ||
|
||
private final ShardId shardId; | ||
private final Executor executor; | ||
private final Logger logger; | ||
|
||
/** | ||
* Construct a global checkpoint listeners collection. | ||
* | ||
* @param shardId the shard ID on which global checkpoint updates can be listened to | ||
* @param executor the executor for listener notifications | ||
* @param logger a shard-level logger | ||
*/ | ||
GlobalCheckpointListeners( | ||
final ShardId shardId, | ||
final Executor executor, | ||
final Logger logger) { | ||
this.shardId = Objects.requireNonNull(shardId); | ||
this.executor = Objects.requireNonNull(executor); | ||
this.logger = Objects.requireNonNull(logger); | ||
} | ||
|
||
/** | ||
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the | ||
* listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the | ||
* shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global | ||
* checkpoint listeners. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you mention here that the listener will be deregistered on notification? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 5b63507. |
||
* | ||
* @param currentGlobalCheckpoint the current global checkpoint known to the listener | ||
* @param listener the listener | ||
*/ | ||
synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener) { | ||
if (closed) { | ||
executor.execute(() -> listener.accept(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId))); | ||
} | ||
if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) { | ||
// notify directly | ||
executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null)); | ||
} else { | ||
if (listeners == null) { | ||
listeners = new ArrayList<>(); | ||
} | ||
listeners.add(listener); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
synchronized (this) { | ||
closed = true; | ||
} | ||
notifyListeners(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)); | ||
} | ||
|
||
/** | ||
* Invoke to notify all registered listeners of an updated global checkpoint. | ||
* | ||
* @param globalCheckpoint the updated global checkpoint | ||
*/ | ||
void globalCheckpointUpdated(final long globalCheckpoint) { | ||
assert globalCheckpoint >= NO_OPS_PERFORMED; | ||
synchronized (this) { | ||
lastKnownGlobalCheckpoint = globalCheckpoint; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the add method assumes that the global checkpoint is always strictly increasing. Add an assertion here that this is so? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 48089fb. |
||
} | ||
notifyListeners(globalCheckpoint, null); | ||
} | ||
|
||
private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for simplicity, let's move this under the mutex everywhere (and add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. I pushed 88dee76. |
||
assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null); | ||
if (listeners != null) { | ||
final List<GlobalCheckpointListener> currentListeners; | ||
synchronized (this) { | ||
currentListeners = listeners; | ||
listeners = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks as if the listeners are only notified once and then need to reregister if they want more events? I can see why this kind of behavior makes sense for the refresh listeners, with refresh being an expensive operation that ensures that all events registered before the refresh will now see the changes they're waiting for. With global checkpoints, it's less clear to me, as they can be potentially updated many many times per second, so wouldn't you want to stay registered to receive events. If not, will this lead to a storm of reregister events? I think I need to better understand the integration point here, i.e., how this API will be used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ywelsch See #32651. The intended usage is in CCR where a remote cluster will, when the remote cluster is fully caught up, (remotely) attach a single-use listener to the local cluster for the next global checkpoint change. When the global checkpoint is updated, the listener will be invoked which will return a response to the remote cluster that will act as letting the remote cluster know that there are now additional changes to be fetched. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, given this behavior, I wonder if we can make the listener interface simpler (same as Boaz's concern). As the listeners are only notified once and then need to reregister if they want more events, I wonder if it's simpler to just signal an UNASSIGNED_SEQ_NO (or the lastKnownGlobalCheckpoint) on a closing and then have the caller fail on a repeated call to the add method (by throwing directly the exception in that method, not relaying it to the listener). This means that the listener can remain a functional interface (but just a LongConsumer). WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ywelsch Personally I do not buy the argument that @Override
protected void asyncShardOperation(
final Request request, final ShardId shardId, final ActionListener<Response> listener) throws IOException {
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.id());
indexShard.addGlobalCheckpointListener(
request.getGlobalCheckpoint(),
(g, e) -> {
if (g != UNASSIGNED_SEQ_NO) {
listener.onResponse(new Response(g));
} else {
listener.onFailure(e);
}
});
} is less clean than @Override
protected void asyncShardOperation(
final Request request, final ShardId shardId, final ActionListener<Response> listener) throws IOException {
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.id());
indexShard.addGlobalCheckpointListener(
request.getGlobalCheckpoint(),
g -> {
if (g != UNASSIGNED_SEQ_NO) {
listener.onResponse(new Response(g));
} else {
listener.onFailure(new IndexShardClosedException(shardId));
}
});
} We still have to have a check in one form or the other whether or not closing has been signaled to us, so there's always going to be an
In fact, I would argue the approach I have taken is cleaner as it's the shard telling us that we are closed rather than it being signaled indirectly through the value of the global checkpoint passed in the callback. Sure the actual interface is simpler but I prefer the explicit approach. So I think we should either stick with what I have, or have an Regarding throwing on attempting to register a listener on a closed shard, that was my preferred approach too but @bleskes thought otherwise. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There also still the option of adding an onClosed method to the interface with a default NOOP implementation. It will remain a functional interface, and if most tests don't care about the shard closed case, they can treat it like a functional interface. I'll leave the decision to you. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would not be a functional interface for the purposes of the production use-case for this that I have in mind (we have to handle the shard closed event). Thanks, I will leave as-is. |
||
} | ||
if (currentListeners != null) { | ||
executor.execute(() -> { | ||
for (final GlobalCheckpointListener listener : currentListeners) { | ||
notifyListener(listener, globalCheckpoint, e); | ||
} | ||
}); | ||
} | ||
} | ||
} | ||
|
||
private void notifyListener(final GlobalCheckpointListener listener, final long globalCheckpoint, final IndexShardClosedException e) { | ||
try { | ||
listener.accept(globalCheckpoint, e); | ||
} catch (final Exception caught) { | ||
if (globalCheckpoint != UNASSIGNED_SEQ_NO) { | ||
logger.warn( | ||
new ParameterizedMessage( | ||
"error notifying global checkpoint listener of updated global checkpoint [{}]", | ||
globalCheckpoint), | ||
caught); | ||
} else { | ||
logger.warn("error notifying global checkpoint listener of closed shard", caught); | ||
} | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I wonder if we should have an onFailure method here for all kind of failures and send the IndexShardClosedException down that route. The down side is of course that people wouldn't be able to pass a method references, but the method won't need to start with if (e != null) etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to use a functional interface for enabling the use of lambda expressions.