Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose whether or not the global checkpoint updated #32659

Merged
merged 6 commits into from
Aug 7, 2018

Conversation

jasontedor
Copy link
Member

It will be useful for future efforts to know if the global checkpoint was updated. To this end, we need to expose whether or not the global checkpoint was updated when the state of the replication tracker updates. For this, we add to the tracker a callback that is invoked whenever the global checkpoint is updated. For primaries this will be invoked when the computed global checkpoint is updated based on state changes to the tracker. For replicas this will be invoked when the local knowledge of the global checkpoint is advanced from the primary.

Relates #32651

It will be useful for future efforts to know if the global checkpoint
was updated. To this end, we need to expose whether or not the global
checkpoint was updated when the state of the replication tracker
updates. For this, we add to the tracker a callback that is invoked
whenever the global checkpoint is updated. For primaries this will be
invoked when the computed global checkpoint is updated based on state
changes to the tracker. For replicas this will be invoked when the local
knowledge of the global checkpoint is advanced from the primary.
@jasontedor jasontedor added review v7.0.0 :Distributed/Distributed A catch all label for anything in the Distributed Area. If you aren't sure, use this one. v6.5.0 labels Aug 6, 2018
@jasontedor jasontedor requested a review from ywelsch August 6, 2018 22:02
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

@jasontedor jasontedor requested a review from bleskes August 7, 2018 09:41
Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some initial questions.

@@ -487,6 +497,7 @@ private void updateGlobalCheckpoint(final String allocationId, final long global
if (cps != null && globalCheckpoint > cps.globalCheckpoint) {
ifUpdated.accept(cps.globalCheckpoint);
cps.globalCheckpoint = globalCheckpoint;
onGlobalCheckpointUpdated.accept(globalCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm doubting whether this should be called out of lock. I'm tending to say yes. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can you clarify why the consumer is called when updating the primary's knowledge of the gcp knowledge of a replica? (this method is used there too)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. That was not intended. I pushed a477ff4.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding not invoking the notification under lock, I was tending to avoid complicating these methods (I like that we have synchronized as a method modifier) and avoiding dealing with the fact that some of these updates can occur under nested invocations of synchronized methods. This would mean returning booleans and dropping the synchronized from the method modifiers. In the POC that I have, the callback is "cheap" because it forks invocation of the listeners to another thread (practically to the listener thread pool):

    private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) {
        assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null);
        if (listeners != null) {
            final List<GlobalCheckpointListener> currentListeners;
            synchronized (this) {
                currentListeners = listeners;
                listeners = null;
            }
            if (currentListeners != null) {
                executor.execute(() -> {
                    for (final GlobalCheckpointListener listener : currentListeners) {
                        try {
                            listener.accept(globalCheckpoint, e);
                        } catch (final Exception caught) {
                            if (globalCheckpoint != UNASSIGNED_SEQ_NO) {
                                logger.warn(
                                        new ParameterizedMessage(
                                                "error notifying global checkpoint listener of updated global checkpoint [{}]",
                                                globalCheckpoint),
                                        caught);
                            } else {
                                logger.warn("error notifying global checkpoint listener of closed shard", caught);
                            }
                        }
                    }
                });
            }
        }
    }

I think this is okay?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine if you intend to spawn off to other threads in the outer layers. Indeed the simplicity of the current solution is what was making me doubt towards having it as you did.

@jasontedor
Copy link
Member Author

Thanks @bleskes, that was a good catch. I responded to your feedback.

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Production code LGTM. Left some nits on the testing.

final long globalCheckpoint = tracker.getGlobalCheckpoint();
updatedGlobalCheckpoint.set(globalCheckpoint);
tracker.updateLocalCheckpoint(allocationId, localCheckpoint);
if (globalCheckpoint == tracker.getGlobalCheckpoint()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logically this is the same as always checking assertThat(updatedGlobalCheckpoint.get(), equalTo(tracker.getGlobalCheckpoint())) no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 614a350.

@@ -99,6 +101,17 @@ private static IndexShardRoutingTable routingTable(final Set<AllocationId> initi
return allocationIds.stream().map(AllocationId::getId).collect(Collectors.toSet());
}

private void updateLocalCheckpoint(final ReplicationTracker tracker, final String allocationId, final long localCheckpoint) {
final long globalCheckpoint = tracker.getGlobalCheckpoint();
updatedGlobalCheckpoint.set(globalCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you manually set it here? I think this method is just after checking that the callback is called , which sets updatedGlobalCheckpoint?

@jasontedor
Copy link
Member Author

@bleskes Will you take one last look?

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jasontedor jasontedor merged commit dcc8164 into elastic:master Aug 7, 2018
jasontedor added a commit that referenced this pull request Aug 7, 2018
It will be useful for future efforts to know if the global checkpoint
was updated. To this end, we need to expose whether or not the global
checkpoint was updated when the state of the replication tracker
updates. For this, we add to the tracker a callback that is invoked
whenever the global checkpoint is updated. For primaries this will be
invoked when the computed global checkpoint is updated based on state
changes to the tracker. For replicas this will be invoked when the local
knowledge of the global checkpoint is advanced from the primary.
@jasontedor jasontedor deleted the replication-tracker-updates branch August 7, 2018 19:14
@jimczi jimczi added v7.0.0-beta1 and removed v7.0.0 labels Feb 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/Distributed A catch all label for anything in the Distributed Area. If you aren't sure, use this one. >enhancement v6.5.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants