Skip to content

Commit

Permalink
Fixed query thread leak (#222)
Browse files Browse the repository at this point in the history
* Fixed query thread leak
* Updated version to 2.1.2
  • Loading branch information
mfateev authored Oct 29, 2018
1 parent 44296d2 commit 6df06a2
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 129 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,12 @@
- Metrics and Logging support in client.
- Side effects, mutable side effects, random uuid and workflow getVersion support.
- Activity heartbeat throttling.
- Deterministic retry of failed operation.
- Deterministic retry of failed operation.

## v2.1.2
- Requires minimum server release v0.4.0
- Introduced WorkerFactory and FactoryOptions
- Added sticky workflow execution, which is caching of a workflow object between decisions. It is enabled by default,
to disable use FactoryOptions.disableStickyExecution property.
- Updated Thrift to expose new types of service exceptions: ServiceBusyError, DomainNotActiveError, LimitExceededError
- Added metric for corrupted signal as well as metrics related to caching and evictions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ googleJavaFormat {
}

group = 'com.uber.cadence'
version = '2.1.0'
version = '2.1.2'

description = """Uber Cadence Java Client"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,12 @@ private Result handleDecisionTaskImpl(PollForDecisionTaskResponse decisionTask)
}

private Result processDecision(PollForDecisionTaskResponse decisionTask) throws Throwable {
Decider decider =
stickyTaskListName == null
? createDecider(decisionTask)
: cache.getOrCreate(decisionTask, this::createDecider);
Decider decider = null;
try {
decider =
stickyTaskListName == null
? createDecider(decisionTask)
: cache.getOrCreate(decisionTask, this::createDecider);
List<Decision> decisions = decider.decide(decisionTask);
if (log.isTraceEnabled()) {
WorkflowExecution execution = decisionTask.getWorkflowExecution();
Expand All @@ -146,13 +147,13 @@ private Result processDecision(PollForDecisionTaskResponse decisionTask) throws
+ " new decisions");
}
return createCompletedRequest(decisionTask, decisions);
} catch (Exception e) {
} catch (Throwable e) {
if (stickyTaskListName != null) {
cache.invalidate(decisionTask);
}
throw e;
} finally {
if (stickyTaskListName == null) {
if (stickyTaskListName == null && decider != null) {
decider.close();
}
}
Expand All @@ -161,8 +162,9 @@ private Result processDecision(PollForDecisionTaskResponse decisionTask) throws
private Result processQuery(PollForDecisionTaskResponse decisionTask) {
RespondQueryTaskCompletedRequest queryCompletedRequest = new RespondQueryTaskCompletedRequest();
queryCompletedRequest.setTaskToken(decisionTask.getTaskToken());
Decider decider = null;
try {
Decider decider =
decider =
stickyTaskListName == null
? createDecider(decisionTask)
: cache.getOrCreate(decisionTask, this::createDecider);
Expand All @@ -176,6 +178,10 @@ private Result processQuery(PollForDecisionTaskResponse decisionTask) {
e.printStackTrace(pw);
queryCompletedRequest.setErrorMessage(sw.toString());
queryCompletedRequest.setCompletedType(QueryTaskCompletedType.FAILED);
} finally {
if (stickyTaskListName == null && decider != null) {
decider.close();
}
}
return new Result(null, null, queryCompletedRequest, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,15 @@ private Object queryWorkflow(Method method, QueryMethod queryMethod, Object[] ar
private Object startWorkflow(Method method, Object[] args) {
Optional<WorkflowOptions> options = untyped.getOptions();
if (untyped.getExecution() == null
|| options.get().getWorkflowIdReusePolicy() == WorkflowIdReusePolicy.AllowDuplicate) {
|| (options.isPresent()
&& options.get().getWorkflowIdReusePolicy() == WorkflowIdReusePolicy.AllowDuplicate)) {
try {
untyped.start(args);
} catch (DuplicateWorkflowException e) {
// We do allow duplicated calls if policy is not AllowDuplicate. Semantic is to wait for
// result.
if (options.get().getWorkflowIdReusePolicy() == WorkflowIdReusePolicy.AllowDuplicate) {
if (options.isPresent()
&& options.get().getWorkflowIdReusePolicy() == WorkflowIdReusePolicy.AllowDuplicate) {
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public void start() {

getDecisionContext()
.getMetricsScope()
.gauge(MetricsType.STICKY_CACHE_SIZE)
.gauge(MetricsType.WORKFLOW_ACTIVE_THREAD_COUNT)
.update(((ThreadPoolExecutor) threadPool).getActiveCount());

try {
Expand Down
40 changes: 30 additions & 10 deletions src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@

package com.uber.cadence.internal.worker;

import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
import com.uber.cadence.PollForDecisionTaskResponse;
import com.uber.cadence.RespondDecisionTaskCompletedRequest;
import com.uber.cadence.RespondDecisionTaskFailedRequest;
import com.uber.cadence.RespondQueryTaskCompletedRequest;
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.WorkflowQuery;
import com.uber.cadence.*;
import com.uber.cadence.common.RetryOptions;
import com.uber.cadence.internal.common.Retryer;
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
import com.uber.cadence.internal.logging.LoggerTag;
import com.uber.cadence.internal.metrics.MetricsType;
import com.uber.cadence.serviceclient.IWorkflowService;
import com.uber.m3.tally.Stopwatch;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -112,17 +108,41 @@ public byte[] queryWorkflowExecution(WorkflowExecution execution, String queryTy
task.setWorkflowExecution(execution);
task.setStartedEventId(Long.MAX_VALUE);
task.setPreviousStartedEventId(Long.MAX_VALUE);
task.setWorkflowType(task.getWorkflowType());
WorkflowQuery query = new WorkflowQuery();
query.setQueryType(queryType).setQueryArgs(args);
task.setQuery(query);
GetWorkflowExecutionHistoryResponse history =
GetWorkflowExecutionHistoryResponse historyResponse =
WorkflowExecutionUtils.getHistoryPage(null, service, domain, execution);
task.setHistory(history.getHistory());
History history = historyResponse.getHistory();

List<HistoryEvent> events = history.getEvents();
if (events == null || events.isEmpty()) {
throw new IllegalStateException("Empty history for " + execution);
}
HistoryEvent startedEvent = events.get(0);
WorkflowExecutionStartedEventAttributes started =
startedEvent.getWorkflowExecutionStartedEventAttributes();
if (started == null) {
throw new IllegalStateException(
"First event of the history is not WorkflowExecutionStarted: " + startedEvent);
}
WorkflowType workflowType = started.getWorkflowType();
task.setWorkflowType(workflowType);
task.setHistory(history);
DecisionTaskHandler.Result result = handler.handleDecisionTask(task);
if (result.getQueryCompleted() != null) {
RespondQueryTaskCompletedRequest r = result.getQueryCompleted();
if (r.getErrorMessage() != null) {
throw new RuntimeException(
"query failure for "
+ execution
+ ", queryType="
+ queryType
+ ", args="
+ Arrays.toString(args)
+ ", error="
+ r.getErrorMessage());
}
return r.getQueryResult();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public TestEnvironmentOptions build() {
}

if (factoryOptions == null) {
factoryOptions = new Worker.FactoryOptions.Builder().build();
factoryOptions =
new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build();
}

return new TestEnvironmentOptions(
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/com/uber/cadence/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,7 @@ enum State {

public static class FactoryOptions {
public static class Builder {
// TODO: Enable by default as soon the service is released
private boolean disableStickyExecution = true;
private boolean disableStickyExecution;
private int stickyDecisionScheduleToStartTimeoutInSeconds = 5;
private int cacheMaximumSize = 600;
private int maxWorkflowThreadCount = 600;
Expand Down
82 changes: 57 additions & 25 deletions src/test/java/com/uber/cadence/worker/StickyWorkerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.uber.cadence.internal.metrics.MetricsTag;
import com.uber.cadence.internal.metrics.MetricsType;
import com.uber.cadence.internal.replay.DeciderCache;
import com.uber.cadence.serviceclient.IWorkflowService;
import com.uber.cadence.serviceclient.WorkflowServiceTChannel;
import com.uber.cadence.testing.TestEnvironmentOptions;
import com.uber.cadence.testing.TestWorkflowEnvironment;
import com.uber.cadence.workflow.Async;
Expand All @@ -56,24 +58,19 @@
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.*;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// @RunWith(Parameterized.class)
@Ignore
@RunWith(Parameterized.class)
public class StickyWorkerTest {
public static final String DOMAIN = "UnitTest";

// TODO: Enable for docker as soon as the server commit
// a36c84991664571636d37a3826b282ddbdbd2402 is released
private static final boolean skipDockerService = true;
// Boolean.parseBoolean(System.getenv("SKIP_DOCKER_SERVICE"));
private static final boolean skipDockerService =
Boolean.parseBoolean(System.getenv("SKIP_DOCKER_SERVICE"));

@Parameterized.Parameter public boolean useExternalService;

Expand All @@ -91,6 +88,15 @@ public static Object[] data() {

@Rule public TestName testName = new TestName();

private IWorkflowService service;

@Before
public void setUp() {
if (testType.equals("Docker") && service == null) {
service = new WorkflowServiceTChannel();
}
}

@Test
public void whenStickyIsEnabledThenTheWorkflowIsCachedSignals() throws Exception {
// Arrange
Expand All @@ -104,7 +110,10 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedSignals() throws Exception

TestEnvironmentWrapper wrapper =
new TestEnvironmentWrapper(
new Worker.FactoryOptions.Builder().setMetricScope(scope).build());
new Worker.FactoryOptions.Builder()
.setDisableStickyExecution(false)
.setMetricScope(scope)
.build());
Worker.Factory factory = wrapper.getWorkerFactory();
Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build());
worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class);
Expand Down Expand Up @@ -159,7 +168,10 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedActivities() throws Except

TestEnvironmentWrapper wrapper =
new TestEnvironmentWrapper(
new Worker.FactoryOptions.Builder().setMetricScope(scope).build());
new Worker.FactoryOptions.Builder()
.setDisableStickyExecution(false)
.setMetricScope(scope)
.build());
Worker.Factory factory = wrapper.getWorkerFactory();
Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build());
worker.registerWorkflowImplementationTypes(ActivitiesWorkflowImpl.class);
Expand Down Expand Up @@ -214,7 +226,10 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedChildWorkflows() throws Ex

TestEnvironmentWrapper wrapper =
new TestEnvironmentWrapper(
new Worker.FactoryOptions.Builder().setMetricScope(scope).build());
new Worker.FactoryOptions.Builder()
.setDisableStickyExecution(false)
.setMetricScope(scope)
.build());
Worker.Factory factory = wrapper.getWorkerFactory();
Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build());
worker.registerWorkflowImplementationTypes(
Expand Down Expand Up @@ -262,7 +277,10 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedMutableSideEffect() throws

TestEnvironmentWrapper wrapper =
new TestEnvironmentWrapper(
new Worker.FactoryOptions.Builder().setMetricScope(scope).build());
new Worker.FactoryOptions.Builder()
.setDisableStickyExecution(false)
.setMetricScope(scope)
.build());
Worker.Factory factory = wrapper.getWorkerFactory();
Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build());
worker.registerWorkflowImplementationTypes(TestMutableSideEffectWorkflowImpl.class);
Expand Down Expand Up @@ -343,7 +361,8 @@ public void whenCacheIsEvictedTheWorkerCanRecover() throws Exception {
// Arrange
String taskListName = "evictedStickyTest";
TestEnvironmentWrapper wrapper =
new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().build());
new TestEnvironmentWrapper(
new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build());
Worker.Factory factory = wrapper.getWorkerFactory();
Worker worker = factory.newWorker(taskListName);
worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class);
Expand Down Expand Up @@ -382,7 +401,8 @@ public void workflowsCanBeQueried() throws Exception {
// Arrange
String taskListName = "queryStickyTest";
TestEnvironmentWrapper wrapper =
new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().build());
new TestEnvironmentWrapper(
new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build());
Worker.Factory factory = wrapper.getWorkerFactory();
Worker worker = factory.newWorker(taskListName);
worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class);
Expand Down Expand Up @@ -422,7 +442,8 @@ public void workflowsCanBeQueriedAfterEviction() throws Exception {
// Arrange
String taskListName = "queryEvictionStickyTest";
TestEnvironmentWrapper wrapper =
new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().build());
new TestEnvironmentWrapper(
new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build());
Worker.Factory factory = wrapper.getWorkerFactory();
Worker worker = factory.newWorker(taskListName);
worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class);
Expand Down Expand Up @@ -467,25 +488,36 @@ private class TestEnvironmentWrapper {

public TestEnvironmentWrapper(Worker.FactoryOptions options) {
if (options == null) {
options = new Worker.FactoryOptions.Builder().build();
options = new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build();
}
if (useExternalService) {
factory = new Worker.Factory(service, DOMAIN, options);
} else {
TestEnvironmentOptions testOptions =
new TestEnvironmentOptions.Builder()
.setDomain(DOMAIN)
.setFactoryOptions(options)
.build();
testEnv = TestWorkflowEnvironment.newInstance(testOptions);
}
factory = new Worker.Factory(DOMAIN, options);
TestEnvironmentOptions testOptions =
new TestEnvironmentOptions.Builder().setDomain(DOMAIN).setFactoryOptions(options).build();
testEnv = TestWorkflowEnvironment.newInstance(testOptions);
}

private Worker.Factory getWorkerFactory() {
return useExternalService ? factory : testEnv.getWorkerFactory();
}

private WorkflowClient getWorkflowClient() {
return useExternalService ? WorkflowClient.newInstance(DOMAIN) : testEnv.newWorkflowClient();
return useExternalService
? WorkflowClient.newInstance(service, DOMAIN)
: testEnv.newWorkflowClient();
}

private void close() {
factory.shutdown(Duration.ofSeconds(1));
testEnv.close();
if (useExternalService) {
factory.shutdown(Duration.ofSeconds(1));
} else {
testEnv.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private class TestEnvironmentWrapper {

public TestEnvironmentWrapper(Worker.FactoryOptions options) {
if (options == null) {
options = new Worker.FactoryOptions.Builder().build();
options = new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build();
}
factory = new Worker.Factory(DOMAIN, options);
TestEnvironmentOptions testOptions =
Expand Down
Loading

0 comments on commit 6df06a2

Please sign in to comment.