Skip to content

Commit

Permalink
Add ability to interrupt commands
Browse files Browse the repository at this point in the history
Add the ability to mark an invocation of a command as interruptible. If another command is executed while this command is running, the command will be interrupted.

Closes #12526.

PiperOrigin-RevId: 346652616
  • Loading branch information
daveyc123 authored and copybara-github committed Dec 9, 2020
1 parent f1dca86 commit 65d18a5
Show file tree
Hide file tree
Showing 9 changed files with 359 additions and 13 deletions.
8 changes: 6 additions & 2 deletions src/main/cpp/blaze.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ class BlazeServer final {
const int connect_timeout_secs_;
const bool batch_;
const bool block_for_lock_;
const bool preemptible_;
const blaze_util::Path output_base_;
};

Expand Down Expand Up @@ -1050,8 +1051,9 @@ static bool IsVolatileArg(const string &arg) {
// not used at server startup to be part of the startup command line. The
// server command line difference logic can be simplified then.
static const std::set<string> volatile_startup_options = {
"--option_sources=", "--max_idle_secs=", "--connect_timeout_secs=",
"--local_startup_timeout_secs=", "--client_debug="};
"--option_sources=", "--max_idle_secs=",
"--connect_timeout_secs=", "--local_startup_timeout_secs=",
"--client_debug=", "--preemptible="};

// Split arg based on the first "=" if one exists in arg.
const string::size_type eq_pos = arg.find_first_of('=');
Expand Down Expand Up @@ -1671,6 +1673,7 @@ BlazeServer::BlazeServer(const StartupOptions &startup_options)
connect_timeout_secs_(startup_options.connect_timeout_secs),
batch_(startup_options.batch),
block_for_lock_(startup_options.block_for_lock),
preemptible_(startup_options.preemptible),
output_base_(startup_options.output_base) {
if (!startup_options.client_debug) {
gpr_set_log_function(null_grpc_log_function);
Expand Down Expand Up @@ -1945,6 +1948,7 @@ unsigned int BlazeServer::Communicate(
command_server::RunRequest request;
request.set_cookie(request_cookie_);
request.set_block_for_lock(block_for_lock_);
request.set_preemptible(preemptible_);
request.set_client_description("pid=" + blaze::GetProcessIdAsString());
for (const string &arg : arg_vector) {
request.add_arg(arg);
Expand Down
2 changes: 2 additions & 0 deletions src/main/cpp/startup_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ StartupOptions::StartupOptions(const string &product_name,
local_startup_timeout_secs(120),
have_invocation_policy_(false),
client_debug(false),
preemptible(false),
java_logging_formatter(
"com.google.devtools.build.lib.util.SingleLineFormatter"),
expand_configs_in_place(true),
Expand Down Expand Up @@ -130,6 +131,7 @@ StartupOptions::StartupOptions(const string &product_name,
RegisterNullaryStartupFlag("batch_cpu_scheduling", &batch_cpu_scheduling);
RegisterNullaryStartupFlag("block_for_lock", &block_for_lock);
RegisterNullaryStartupFlag("client_debug", &client_debug);
RegisterNullaryStartupFlag("preemptible", &preemptible);
RegisterNullaryStartupFlag("expand_configs_in_place",
&expand_configs_in_place);
RegisterNullaryStartupFlag("fatal_event_bus_exceptions",
Expand Down
4 changes: 4 additions & 0 deletions src/main/cpp/startup_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ class StartupOptions {
// Whether to output addition debugging information in the client.
bool client_debug;

// Whether the resulting command will be preempted if a subsequent command is
// run.
bool preemptible;

// Value of the java.util.logging.FileHandler.formatter Java property.
std::string java_logging_formatter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,14 @@ public String getTypeDescription() {
+ "cause the server to restart.")
public boolean clientDebug;

@Option(
name = "preemptible",
defaultValue = "false", // NOTE: only for documentation, value is set and used by the client.
documentationCategory = OptionDocumentationCategory.BAZEL_CLIENT_OPTIONS,
effectTags = {OptionEffectTag.EAGERNESS_TO_EXIT},
help = "If true, the command can be preempted if another command is started.")
public boolean preemptible;

@Option(
name = "connect_timeout_secs",
defaultValue = "30", // NOTE: only for documentation, value is set and used by the client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ class CommandManager {
idle();
}

void preemptEligibleCommands() {
synchronized (runningCommandsMap) {
ImmutableSet.Builder<String> commandsToInterruptBuilder = new ImmutableSet.Builder<>();

for (RunningCommand command : runningCommandsMap.values()) {
if (command.isPreemptible()) {
command.thread.interrupt();
commandsToInterruptBuilder.add(command.id);
}
}

ImmutableSet<String> commandsToInterrupt = commandsToInterruptBuilder.build();
if (!commandsToInterrupt.isEmpty()) {
startSlowInterruptWatcher(commandsToInterrupt);
}
}
}

void interruptInflightCommands() {
synchronized (runningCommandsMap) {
for (RunningCommand command : runningCommandsMap.values()) {
Expand All @@ -54,7 +72,7 @@ void interruptInflightCommands() {
}

void doCancel(CancelRequest request) {
try (RunningCommand cancelCommand = create()) {
try (RunningCommand cancelCommand = createCommand()) {
synchronized (runningCommandsMap) {
RunningCommand pendingCommand = runningCommandsMap.get(request.getCommandId());
if (pendingCommand != null) {
Expand Down Expand Up @@ -88,8 +106,19 @@ void waitForChange(long timeout) throws InterruptedException {
}
}

RunningCommand create() {
RunningCommand command = new RunningCommand();
RunningCommand createPreemptibleCommand() {
RunningCommand command = new RunningCommand(true);
registerCommand(command);
return command;
}

RunningCommand createCommand() {
RunningCommand command = new RunningCommand(false);
registerCommand(command);
return command;
}

private void registerCommand(RunningCommand command) {
synchronized (runningCommandsMap) {
if (runningCommandsMap.isEmpty()) {
busy();
Expand All @@ -98,7 +127,6 @@ RunningCommand create() {
runningCommandsMap.notify();
}
logger.atInfo().log("Starting command %s on thread %s", command.id, command.thread.getName());
return command;
}

private void idle() {
Expand Down Expand Up @@ -148,10 +176,12 @@ private void startSlowInterruptWatcher(final ImmutableSet<String> commandIds) {
class RunningCommand implements AutoCloseable {
private final Thread thread;
private final String id;
private final boolean preemptible;

private RunningCommand() {
private RunningCommand(boolean preemptible) {
thread = Thread.currentThread();
id = UUID.randomUUID().toString();
this.preemptible = preemptible;
}

@Override
Expand All @@ -170,5 +200,9 @@ public void close() {
String getId() {
return id;
}

boolean isPreemptible() {
return this.preemptible;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,12 @@ private void executeCommand(RunRequest request, BlockingStreamObserver<RunRespon
option.getOption().toString(StandardCharsets.ISO_8859_1)));
}

try (RunningCommand command = commandManager.create()) {
commandManager.preemptEligibleCommands();

try (RunningCommand command =
request.getPreemptible()
? commandManager.createPreemptibleCommand()
: commandManager.createCommand()) {
commandId = command.getId();

try {
Expand Down Expand Up @@ -608,7 +613,7 @@ public void run(final RunRequest request, final StreamObserver<RunResponse> obse

@Override
public void ping(PingRequest pingRequest, StreamObserver<PingResponse> streamObserver) {
try (RunningCommand command = commandManager.create()) {
try (RunningCommand command = commandManager.createCommand()) {
PingResponse.Builder response = PingResponse.newBuilder();
if (pingRequest.getCookie().equals(requestCookie)) {
response.setCookie(responseCookie);
Expand Down
4 changes: 4 additions & 0 deletions src/main/protobuf/command_server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ message RunRequest {
// came from. These options have already been parsed and already have had
// their effect. This information should only be used for logging.
repeated StartupOption startup_options = 6;

// Whether the resulting command can be preempted if additional commands
// are received.
bool preemptible = 7;
}

// Contains the a startup option with its source file. Uses bytes to preserve
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public class CommandManagerTest {
public void testBasicOperationsOnSingleThread() {
CommandManager underTest = new CommandManager(/*doIdleServerTasks=*/ false);
assertThat(underTest.isEmpty()).isTrue();
try (RunningCommand firstCommand = underTest.create()) {
try (RunningCommand firstCommand = underTest.createCommand()) {
assertThat(underTest.isEmpty()).isFalse();
assertThat(isValidUuid(firstCommand.getId())).isTrue();
try (RunningCommand secondCommand = underTest.create()) {
try (RunningCommand secondCommand = underTest.createCommand()) {
assertThat(underTest.isEmpty()).isFalse();
assertThat(isValidUuid(secondCommand.getId())).isTrue();
assertThat(firstCommand.getId()).isNotEqualTo(secondCommand.getId());
Expand Down Expand Up @@ -75,11 +75,11 @@ public void testNotifiesOnBusyAndIdle() throws Exception {

// We want to ensure at each step that we are actively awaiting notification.
waitForThreadWaiting(waiting, thread);
try (RunningCommand firstCommand = underTest.create()) {
try (RunningCommand firstCommand = underTest.createCommand()) {
cyclicBarrier.await();
assertThat(notificationCounter.get()).isEqualTo(1);
waitForThreadWaiting(waiting, thread);
try (RunningCommand secondCommand = underTest.create()) {
try (RunningCommand secondCommand = underTest.createCommand()) {
cyclicBarrier.await();
assertThat(notificationCounter.get()).isEqualTo(2);
waitForThreadWaiting(waiting, thread);
Expand Down
Loading

0 comments on commit 65d18a5

Please sign in to comment.