Skip to content

Commit

Permalink
Always attach system user to internal actions (#43902)
Browse files Browse the repository at this point in the history
All valid licenses permit security, and the only license state where
we don't support security is when there is a missing license.
However, for safety we should attach the system (or xpack/security)
user to internally originated actions even if the license is missing
(or, more strictly, doesn't support security).

This allows all nodes to communicate and send internal actions (shard
state, handshake/pings, etc) even if a license is transitioning
between a broken state and a valid state.

Relates: #42215
Backport of: #43468
  • Loading branch information
tvernum authored Jul 3, 2019
1 parent cd29722 commit deacc20
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,50 +102,55 @@ public AsyncSender interceptSender(AsyncSender sender) {
@Override
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler) {
// make a local copy of isStateNotRecovered as this is a volatile variable and it
// is used multiple times in the method. The copy to a local variable allows us to
// guarantee we use the same value wherever we would check the value for the state
// being recovered
final boolean stateNotRecovered = isStateNotRecovered;
final boolean sendWithAuth = licenseState.isAuthAllowed() || stateNotRecovered;
if (sendWithAuth) {
// the transport in core normally does this check, BUT since we are serializing to a string header we need to do it
// ourselves otherwise we wind up using a version newer than what we can actually send
final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT);

// Sometimes a system action gets executed like a internal create index request or update mappings request
// which means that the user is copied over to system actions so we need to change the user
if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {
securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
, handler), sender, stateNotRecovered), minVersion);
} else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) {
AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadPool.getThreadContext(), securityContext,
(original) -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
, handler), sender, stateNotRecovered));
} else if (securityContext.getAuthentication() != null &&
securityContext.getAuthentication().getVersion().equals(minVersion) == false) {
// re-write the authentication since we want the authentication version to match the version of the connection
securityContext.executeAfterRewritingAuthentication(original -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender,
stateNotRecovered), minVersion);
} else {
sendWithUser(connection, action, request, options, handler, sender, stateNotRecovered);
}
final boolean requireAuth = shouldRequireExistingAuthentication();
// the transport in core normally does this check, BUT since we are serializing to a string header we need to do it
// ourselves otherwise we wind up using a version newer than what we can actually send
final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT);

// Sometimes a system action gets executed like a internal create index request or update mappings request
// which means that the user is copied over to system actions so we need to change the user
if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {
securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
, handler), sender, requireAuth), minVersion);
} else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) {
AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadPool.getThreadContext(), securityContext,
(original) -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
, handler), sender, requireAuth));
} else if (securityContext.getAuthentication() != null &&
securityContext.getAuthentication().getVersion().equals(minVersion) == false) {
// re-write the authentication since we want the authentication version to match the version of the connection
securityContext.executeAfterRewritingAuthentication(original -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender,
requireAuth), minVersion);
} else {
sender.sendRequest(connection, action, request, options, handler);
sendWithUser(connection, action, request, options, handler, sender, requireAuth);
}
}
};
}

/**
* Based on the current cluster state &amp; license, should we require that all outgoing actions have an authentication header
* of some sort?
*/
private boolean shouldRequireExistingAuthentication() {
// If the license state is MISSING, then auth is not allowed.
// However this makes it difficult to installing a valid license, because that might implicitly turn on security.
// When security is enabled on the master node it will then reject any actions that do not have authentication headers
// but there may be in-flight internal actions (that will not have authentication headers) such as "cluster/shard/started"
// which we don't want to reject.
// So, we always send authentication headers for actions that have an implied user (system-user or explicit-origin)
// and then for other (user originated) actions we enforce that there is an authentication header that we can send, iff the
// current license allows authentication.
return licenseState.isAuthAllowed() && isStateNotRecovered == false;
}

private <T extends TransportResponse> void sendWithUser(Transport.Connection connection, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler,
AsyncSender sender, final boolean stateNotRecovered) {
// There cannot be a request outgoing from this node that is not associated with a user
// unless we do not know the actual license of the cluster
if (securityContext.getAuthentication() == null && stateNotRecovered == false) {
AsyncSender sender, final boolean requireAuthentication) {
if (securityContext.getAuthentication() == null && requireAuthentication) {
// we use an assertion here to ensure we catch this in our testing infrastructure, but leave the ISE for cases we do not catch
// in tests and may be hit by a user
assertNoAuthentication(action);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.security.transport;

import org.elasticsearch.Version;
import org.elasticsearch.action.main.MainAction;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand Down Expand Up @@ -45,14 +46,15 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

public class SecurityServerTransportInterceptorTests extends ESTestCase {
Expand Down Expand Up @@ -82,28 +84,61 @@ public void stopThreadPool() throws Exception {
terminate(threadPool);
}

public void testSendAsyncUnlicensed() {
public void testSendAsyncUserActionWhenUnlicensed() {
SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool,
mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class),
securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))), clusterService);
ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener
when(xPackLicenseState.isAuthAllowed()).thenReturn(false);
AtomicBoolean calledWrappedSender = new AtomicBoolean(false);
AtomicReference<User> sendingUser = new AtomicReference<>();
AsyncSender sender = interceptor.interceptSender(new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler) {
if (calledWrappedSender.compareAndSet(false, true) == false) {
fail("sender called more than once!");
}
sendingUser.set(securityContext.getUser());
}
});
sender.sendRequest(null, null, null, null, null);
Connection connection = mock(Connection.class);
when(connection.getVersion()).thenReturn(Version.CURRENT);
sender.sendRequest(connection, MainAction.NAME, null, null, null);
assertTrue(calledWrappedSender.get());
assertThat(sendingUser.get(), nullValue());
verify(xPackLicenseState).isAuthAllowed();
verifyNoMoreInteractions(xPackLicenseState);
verifyZeroInteractions(securityContext);
}

public void testSendAsyncInternalActionWhenUnlicensed() {
SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool,
mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class),
securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))), clusterService);
ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener
when(xPackLicenseState.isAuthAllowed()).thenReturn(false);
AtomicBoolean calledWrappedSender = new AtomicBoolean(false);
AtomicReference<User> sendingUser = new AtomicReference<>();
AsyncSender sender = interceptor.interceptSender(new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler) {
if (calledWrappedSender.compareAndSet(false, true) == false) {
fail("sender called more than once!");
}
sendingUser.set(securityContext.getUser());
}
});
Connection connection = mock(Connection.class);
when(connection.getVersion()).thenReturn(Version.CURRENT);
sender.sendRequest(connection, "internal:foo", null, null, null);
assertTrue(calledWrappedSender.get());
assertThat(sendingUser.get(), is(SystemUser.INSTANCE));
verify(xPackLicenseState).isAuthAllowed();
verify(securityContext).executeAsUser(any(User.class), any(Consumer.class), eq(Version.CURRENT));
verifyNoMoreInteractions(xPackLicenseState);
}

public void testSendAsyncWithStateNotRecovered() {
Expand Down

0 comments on commit deacc20

Please sign in to comment.