From 6df06a2e99f8216afae49ece7b60c1a34447e5a3 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Sun, 28 Oct 2018 17:50:32 -0700 Subject: [PATCH] Fixed query thread leak (#222) * Fixed query thread leak * Updated version to 2.1.2 --- CHANGELOG.md | 10 +- build.gradle | 2 +- .../replay/ReplayDecisionTaskHandler.java | 20 +- .../sync/WorkflowInvocationHandler.java | 6 +- .../internal/sync/WorkflowThreadImpl.java | 2 +- .../internal/worker/WorkflowWorker.java | 40 +++- .../testing/TestEnvironmentOptions.java | 3 +- .../java/com/uber/cadence/worker/Worker.java | 3 +- .../uber/cadence/worker/StickyWorkerTest.java | 82 ++++--- .../cadence/worker/WorkerStressTests.java | 2 +- .../uber/cadence/workflow/WorkflowTest.java | 212 +++++++++++------- src/test/resources/logback-test.xml | 2 +- 12 files changed, 255 insertions(+), 129 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e045d123d..fde72fdaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,4 +10,12 @@ - Metrics and Logging support in client. - Side effects, mutable side effects, random uuid and workflow getVersion support. - Activity heartbeat throttling. -- Deterministic retry of failed operation. \ No newline at end of file +- Deterministic retry of failed operation. + +## v2.1.2 +- Requires minimum server release v0.4.0 +- Introduced WorkerFactory and FactoryOptions +- Added sticky workflow execution, which is caching of a workflow object between decisions. It is enabled by default, +to disable use FactoryOptions.disableStickyExecution property. +- Updated Thrift to expose new types of service exceptions: ServiceBusyError, DomainNotActiveError, LimitExceededError +- Added metric for corrupted signal as well as metrics related to caching and evictions. \ No newline at end of file diff --git a/build.gradle b/build.gradle index cb34a4e78..05d2dd4d9 100644 --- a/build.gradle +++ b/build.gradle @@ -37,7 +37,7 @@ googleJavaFormat { } group = 'com.uber.cadence' -version = '2.1.0' +version = '2.1.2' description = """Uber Cadence Java Client""" diff --git a/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java b/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java index 87674dbae..9e975c684 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java +++ b/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java @@ -115,11 +115,12 @@ private Result handleDecisionTaskImpl(PollForDecisionTaskResponse decisionTask) } private Result processDecision(PollForDecisionTaskResponse decisionTask) throws Throwable { - Decider decider = - stickyTaskListName == null - ? createDecider(decisionTask) - : cache.getOrCreate(decisionTask, this::createDecider); + Decider decider = null; try { + decider = + stickyTaskListName == null + ? createDecider(decisionTask) + : cache.getOrCreate(decisionTask, this::createDecider); List decisions = decider.decide(decisionTask); if (log.isTraceEnabled()) { WorkflowExecution execution = decisionTask.getWorkflowExecution(); @@ -146,13 +147,13 @@ private Result processDecision(PollForDecisionTaskResponse decisionTask) throws + " new decisions"); } return createCompletedRequest(decisionTask, decisions); - } catch (Exception e) { + } catch (Throwable e) { if (stickyTaskListName != null) { cache.invalidate(decisionTask); } throw e; } finally { - if (stickyTaskListName == null) { + if (stickyTaskListName == null && decider != null) { decider.close(); } } @@ -161,8 +162,9 @@ private Result processDecision(PollForDecisionTaskResponse decisionTask) throws private Result processQuery(PollForDecisionTaskResponse decisionTask) { RespondQueryTaskCompletedRequest queryCompletedRequest = new RespondQueryTaskCompletedRequest(); queryCompletedRequest.setTaskToken(decisionTask.getTaskToken()); + Decider decider = null; try { - Decider decider = + decider = stickyTaskListName == null ? createDecider(decisionTask) : cache.getOrCreate(decisionTask, this::createDecider); @@ -176,6 +178,10 @@ private Result processQuery(PollForDecisionTaskResponse decisionTask) { e.printStackTrace(pw); queryCompletedRequest.setErrorMessage(sw.toString()); queryCompletedRequest.setCompletedType(QueryTaskCompletedType.FAILED); + } finally { + if (stickyTaskListName == null && decider != null) { + decider.close(); + } } return new Result(null, null, queryCompletedRequest, null); } diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowInvocationHandler.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowInvocationHandler.java index 1210f6c64..b2c1d89f6 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowInvocationHandler.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowInvocationHandler.java @@ -206,13 +206,15 @@ private Object queryWorkflow(Method method, QueryMethod queryMethod, Object[] ar private Object startWorkflow(Method method, Object[] args) { Optional options = untyped.getOptions(); if (untyped.getExecution() == null - || options.get().getWorkflowIdReusePolicy() == WorkflowIdReusePolicy.AllowDuplicate) { + || (options.isPresent() + && options.get().getWorkflowIdReusePolicy() == WorkflowIdReusePolicy.AllowDuplicate)) { try { untyped.start(args); } catch (DuplicateWorkflowException e) { // We do allow duplicated calls if policy is not AllowDuplicate. Semantic is to wait for // result. - if (options.get().getWorkflowIdReusePolicy() == WorkflowIdReusePolicy.AllowDuplicate) { + if (options.isPresent() + && options.get().getWorkflowIdReusePolicy() == WorkflowIdReusePolicy.AllowDuplicate) { throw e; } } diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java index 2803fd639..60d29d2e7 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java @@ -228,7 +228,7 @@ public void start() { getDecisionContext() .getMetricsScope() - .gauge(MetricsType.STICKY_CACHE_SIZE) + .gauge(MetricsType.WORKFLOW_ACTIVE_THREAD_COUNT) .update(((ThreadPoolExecutor) threadPool).getActiveCount()); try { diff --git a/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java b/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java index 493555f9e..88afa85a6 100644 --- a/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java @@ -17,13 +17,7 @@ package com.uber.cadence.internal.worker; -import com.uber.cadence.GetWorkflowExecutionHistoryResponse; -import com.uber.cadence.PollForDecisionTaskResponse; -import com.uber.cadence.RespondDecisionTaskCompletedRequest; -import com.uber.cadence.RespondDecisionTaskFailedRequest; -import com.uber.cadence.RespondQueryTaskCompletedRequest; -import com.uber.cadence.WorkflowExecution; -import com.uber.cadence.WorkflowQuery; +import com.uber.cadence.*; import com.uber.cadence.common.RetryOptions; import com.uber.cadence.internal.common.Retryer; import com.uber.cadence.internal.common.WorkflowExecutionUtils; @@ -31,6 +25,8 @@ import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.m3.tally.Stopwatch; +import java.util.Arrays; +import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -112,17 +108,41 @@ public byte[] queryWorkflowExecution(WorkflowExecution execution, String queryTy task.setWorkflowExecution(execution); task.setStartedEventId(Long.MAX_VALUE); task.setPreviousStartedEventId(Long.MAX_VALUE); - task.setWorkflowType(task.getWorkflowType()); WorkflowQuery query = new WorkflowQuery(); query.setQueryType(queryType).setQueryArgs(args); task.setQuery(query); - GetWorkflowExecutionHistoryResponse history = + GetWorkflowExecutionHistoryResponse historyResponse = WorkflowExecutionUtils.getHistoryPage(null, service, domain, execution); - task.setHistory(history.getHistory()); + History history = historyResponse.getHistory(); + List events = history.getEvents(); + if (events == null || events.isEmpty()) { + throw new IllegalStateException("Empty history for " + execution); + } + HistoryEvent startedEvent = events.get(0); + WorkflowExecutionStartedEventAttributes started = + startedEvent.getWorkflowExecutionStartedEventAttributes(); + if (started == null) { + throw new IllegalStateException( + "First event of the history is not WorkflowExecutionStarted: " + startedEvent); + } + WorkflowType workflowType = started.getWorkflowType(); + task.setWorkflowType(workflowType); + task.setHistory(history); DecisionTaskHandler.Result result = handler.handleDecisionTask(task); if (result.getQueryCompleted() != null) { RespondQueryTaskCompletedRequest r = result.getQueryCompleted(); + if (r.getErrorMessage() != null) { + throw new RuntimeException( + "query failure for " + + execution + + ", queryType=" + + queryType + + ", args=" + + Arrays.toString(args) + + ", error=" + + r.getErrorMessage()); + } return r.getQueryResult(); } diff --git a/src/main/java/com/uber/cadence/testing/TestEnvironmentOptions.java b/src/main/java/com/uber/cadence/testing/TestEnvironmentOptions.java index 9ec39e17d..3db435e01 100644 --- a/src/main/java/com/uber/cadence/testing/TestEnvironmentOptions.java +++ b/src/main/java/com/uber/cadence/testing/TestEnvironmentOptions.java @@ -93,7 +93,8 @@ public TestEnvironmentOptions build() { } if (factoryOptions == null) { - factoryOptions = new Worker.FactoryOptions.Builder().build(); + factoryOptions = + new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build(); } return new TestEnvironmentOptions( diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index c7af4c918..431a3421b 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -595,8 +595,7 @@ enum State { public static class FactoryOptions { public static class Builder { - // TODO: Enable by default as soon the service is released - private boolean disableStickyExecution = true; + private boolean disableStickyExecution; private int stickyDecisionScheduleToStartTimeoutInSeconds = 5; private int cacheMaximumSize = 600; private int maxWorkflowThreadCount = 600; diff --git a/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java b/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java index 552e7c6b6..e2dd5fe86 100644 --- a/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java +++ b/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java @@ -34,6 +34,8 @@ import com.uber.cadence.internal.metrics.MetricsTag; import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.internal.replay.DeciderCache; +import com.uber.cadence.serviceclient.IWorkflowService; +import com.uber.cadence.serviceclient.WorkflowServiceTChannel; import com.uber.cadence.testing.TestEnvironmentOptions; import com.uber.cadence.testing.TestWorkflowEnvironment; import com.uber.cadence.workflow.Async; @@ -56,24 +58,19 @@ import java.util.Map; import java.util.Queue; import java.util.Random; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; +import org.junit.*; import org.junit.rules.TestName; +import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// @RunWith(Parameterized.class) -@Ignore +@RunWith(Parameterized.class) public class StickyWorkerTest { public static final String DOMAIN = "UnitTest"; - // TODO: Enable for docker as soon as the server commit - // a36c84991664571636d37a3826b282ddbdbd2402 is released - private static final boolean skipDockerService = true; - // Boolean.parseBoolean(System.getenv("SKIP_DOCKER_SERVICE")); + private static final boolean skipDockerService = + Boolean.parseBoolean(System.getenv("SKIP_DOCKER_SERVICE")); @Parameterized.Parameter public boolean useExternalService; @@ -91,6 +88,15 @@ public static Object[] data() { @Rule public TestName testName = new TestName(); + private IWorkflowService service; + + @Before + public void setUp() { + if (testType.equals("Docker") && service == null) { + service = new WorkflowServiceTChannel(); + } + } + @Test public void whenStickyIsEnabledThenTheWorkflowIsCachedSignals() throws Exception { // Arrange @@ -104,7 +110,10 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedSignals() throws Exception TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder().setMetricScope(scope).build()); + new Worker.FactoryOptions.Builder() + .setDisableStickyExecution(false) + .setMetricScope(scope) + .build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build()); worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class); @@ -159,7 +168,10 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedActivities() throws Except TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder().setMetricScope(scope).build()); + new Worker.FactoryOptions.Builder() + .setDisableStickyExecution(false) + .setMetricScope(scope) + .build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build()); worker.registerWorkflowImplementationTypes(ActivitiesWorkflowImpl.class); @@ -214,7 +226,10 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedChildWorkflows() throws Ex TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder().setMetricScope(scope).build()); + new Worker.FactoryOptions.Builder() + .setDisableStickyExecution(false) + .setMetricScope(scope) + .build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build()); worker.registerWorkflowImplementationTypes( @@ -262,7 +277,10 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedMutableSideEffect() throws TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder().setMetricScope(scope).build()); + new Worker.FactoryOptions.Builder() + .setDisableStickyExecution(false) + .setMetricScope(scope) + .build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build()); worker.registerWorkflowImplementationTypes(TestMutableSideEffectWorkflowImpl.class); @@ -343,7 +361,8 @@ public void whenCacheIsEvictedTheWorkerCanRecover() throws Exception { // Arrange String taskListName = "evictedStickyTest"; TestEnvironmentWrapper wrapper = - new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().build()); + new TestEnvironmentWrapper( + new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName); worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class); @@ -382,7 +401,8 @@ public void workflowsCanBeQueried() throws Exception { // Arrange String taskListName = "queryStickyTest"; TestEnvironmentWrapper wrapper = - new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().build()); + new TestEnvironmentWrapper( + new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName); worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class); @@ -422,7 +442,8 @@ public void workflowsCanBeQueriedAfterEviction() throws Exception { // Arrange String taskListName = "queryEvictionStickyTest"; TestEnvironmentWrapper wrapper = - new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().build()); + new TestEnvironmentWrapper( + new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName); worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class); @@ -467,12 +488,18 @@ private class TestEnvironmentWrapper { public TestEnvironmentWrapper(Worker.FactoryOptions options) { if (options == null) { - options = new Worker.FactoryOptions.Builder().build(); + options = new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build(); + } + if (useExternalService) { + factory = new Worker.Factory(service, DOMAIN, options); + } else { + TestEnvironmentOptions testOptions = + new TestEnvironmentOptions.Builder() + .setDomain(DOMAIN) + .setFactoryOptions(options) + .build(); + testEnv = TestWorkflowEnvironment.newInstance(testOptions); } - factory = new Worker.Factory(DOMAIN, options); - TestEnvironmentOptions testOptions = - new TestEnvironmentOptions.Builder().setDomain(DOMAIN).setFactoryOptions(options).build(); - testEnv = TestWorkflowEnvironment.newInstance(testOptions); } private Worker.Factory getWorkerFactory() { @@ -480,12 +507,17 @@ private Worker.Factory getWorkerFactory() { } private WorkflowClient getWorkflowClient() { - return useExternalService ? WorkflowClient.newInstance(DOMAIN) : testEnv.newWorkflowClient(); + return useExternalService + ? WorkflowClient.newInstance(service, DOMAIN) + : testEnv.newWorkflowClient(); } private void close() { - factory.shutdown(Duration.ofSeconds(1)); - testEnv.close(); + if (useExternalService) { + factory.shutdown(Duration.ofSeconds(1)); + } else { + testEnv.close(); + } } } diff --git a/src/test/java/com/uber/cadence/worker/WorkerStressTests.java b/src/test/java/com/uber/cadence/worker/WorkerStressTests.java index 44512ce73..d13019b2d 100644 --- a/src/test/java/com/uber/cadence/worker/WorkerStressTests.java +++ b/src/test/java/com/uber/cadence/worker/WorkerStressTests.java @@ -171,7 +171,7 @@ private class TestEnvironmentWrapper { public TestEnvironmentWrapper(Worker.FactoryOptions options) { if (options == null) { - options = new Worker.FactoryOptions.Builder().build(); + options = new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build(); } factory = new Worker.Factory(DOMAIN, options); TestEnvironmentOptions testOptions = diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index d0bedb7f0..dda1d544f 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -57,6 +57,7 @@ import com.uber.cadence.workflow.Functions.Func1; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.management.ManagementFactory; import java.lang.reflect.Type; import java.time.Duration; import java.util.*; @@ -109,9 +110,16 @@ public class WorkflowTest { @Parameters(name = "{1}") public static Object[] data() { if (skipDockerService) { - return new Object[][] {{false, "TestService"}}; + return new Object[][] { + {false, "TestService Sticky Off", true}, {false, "TestService Sticky On", false} + }; } else { - return new Object[][] {{true, "Docker"}, {false, "TestService"}}; + return new Object[][] { + {true, "Docker Sticky OFF", true}, + {true, "Docker Sticky ON", false}, + {false, "TestService Sticky OFF", true}, + {false, "TestService Sticky ON", false} + }; } } @@ -119,7 +127,7 @@ public static Object[] data() { @Rule public Timeout globalTimeout = - Timeout.seconds(DEBUGGER_TIMEOUTS ? 500 : skipDockerService ? 10 : 20); + Timeout.seconds(DEBUGGER_TIMEOUTS ? 500 : skipDockerService ? 10 : 30); @Rule public TestWatcher watchman = @@ -140,6 +148,9 @@ protected void failed(Throwable e, Description description) { @Parameter(1) public String testType; + @Parameter(2) + public boolean disableStickyExecution; + public static final String DOMAIN = "UnitTest"; private static final Logger log = LoggerFactory.getLogger(WorkflowTest.class); @@ -207,15 +218,15 @@ public void setUp() { tracer = new TracingWorkflowInterceptorFactory(); // TODO: Create a version of TestWorkflowEnvironment that runs against a real service. if (useExternalService) { - // TODO: Enable sticky execution as soon as the server commit - // a36c84991664571636d37a3826b282ddbdbd2402 is released Worker.FactoryOptions factoryOptions = - new Worker.FactoryOptions.Builder().setDisableStickyExecution(true).build(); + new Worker.FactoryOptions.Builder() + .setDisableStickyExecution(disableStickyExecution) + .build(); workerFactory = new Worker.Factory(service, DOMAIN, factoryOptions); WorkerOptions workerOptions = new WorkerOptions.Builder().setInterceptorFactory(tracer).build(); worker = workerFactory.newWorker(taskList, workerOptions); - workflowClient = WorkflowClient.newInstance(DOMAIN); + workflowClient = WorkflowClient.newInstance(service, DOMAIN); WorkflowClientOptions clientOptions = new WorkflowClientOptions.Builder() .setDataConverter(JsonDataConverter.getInstance()) @@ -227,6 +238,10 @@ public void setUp() { new TestEnvironmentOptions.Builder() .setDomain(DOMAIN) .setInterceptorFactory(tracer) + .setFactoryOptions( + new Worker.FactoryOptions.Builder() + .setDisableStickyExecution(disableStickyExecution) + .build()) .build(); testEnvironment = TestWorkflowEnvironment.newInstance(testOptions); worker = testEnvironment.newWorker(taskList); @@ -247,7 +262,9 @@ public void setUp() { @After public void tearDown() throws Throwable { - activitiesImpl.close(); + if (activitiesImpl != null) { + activitiesImpl.close(); + } if (testEnvironment != null) { testEnvironment.close(); } @@ -275,6 +292,8 @@ private void startWorkerFor(Class... workflowTypes) { } } + // TODO: Refactor testEnvironment to support testing through real service to avoid this + // conditional switches void registerDelayedCallback(Duration delay, Runnable r) { if (useExternalService) { ScheduledFuture result = @@ -285,6 +304,18 @@ void registerDelayedCallback(Duration delay, Runnable r) { } } + void sleep(Duration d) { + if (useExternalService) { + try { + Thread.sleep(d.toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted", e); + } + } else { + testEnvironment.sleep(d); + } + } + public interface TestWorkflow1 { @WorkflowMethod @@ -627,13 +658,13 @@ public void testHeartbeatTimeoutDetails() { } @Test - public void testSyncUntypedAndStackTrace() throws InterruptedException { + public void testSyncUntypedAndStackTrace() { startWorkerFor(TestSyncWorkflowImpl.class); WorkflowStub workflowStub = workflowClient.newUntypedWorkflowStub( "TestWorkflow1::execute", newWorkflowOptionsBuilder(taskList).build()); WorkflowExecution execution = workflowStub.start(taskList); - Thread.sleep(500); + sleep(Duration.ofMillis(500)); String stackTrace = workflowStub.query(WorkflowClient.QUERY_TYPE_STACK_TRCE, String.class); assertTrue(stackTrace, stackTrace.contains("WorkflowTest$TestSyncWorkflowImpl.execute")); assertTrue(stackTrace, stackTrace.contains("activityWithDelay")); @@ -754,13 +785,13 @@ public String execute(String taskList) { } @Test - public void testDetachedScope() throws InterruptedException { + public void testDetachedScope() { startWorkerFor(TestDetachedCancellationScope.class); WorkflowStub client = workflowClient.newUntypedWorkflowStub( "TestWorkflow1::execute", newWorkflowOptionsBuilder(taskList).build()); client.start(taskList); - Thread.sleep(500); // To let activityWithDelay start. + sleep(Duration.ofMillis(500)); // To let activityWithDelay start. client.cancel(); try { client.getResult(String.class); @@ -1847,68 +1878,95 @@ public void mySignal(String value) { } @Test - public void testSignal() { - AtomicReference execution = new AtomicReference<>(); + public void testSignal() throws Exception { + // Test getTrace through replay by a local worker. + Worker queryWorker; + if (useExternalService) { + Worker.Factory workerFactory = new Worker.Factory(service, DOMAIN); + queryWorker = workerFactory.newWorker(taskList); + } else { + queryWorker = testEnvironment.newWorker(taskList); + } + queryWorker.registerWorkflowImplementationTypes(TestSignalWorkflowImpl.class); startWorkerFor(TestSignalWorkflowImpl.class); WorkflowOptions.Builder optionsBuilder = newWorkflowOptionsBuilder(taskList); String workflowId = UUID.randomUUID().toString(); optionsBuilder.setWorkflowId(workflowId); - AtomicReference client = new AtomicReference<>(); - registerDelayedCallback( - Duration.ofSeconds(1), - () -> { - assertEquals(workflowId, execution.get().getWorkflowId()); - assertEquals("initial", client.get().getState()); - client.get().mySignal("Hello "); - try { - Thread.sleep(200); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - // Test client.get() created using WorkflowExecution - client.set( - workflowClient.newWorkflowStub( - QueryableWorkflow.class, - execution.get().getWorkflowId(), - Optional.of(execution.get().getRunId()))); - assertEquals("Hello ", client.get().getState()); - - // Test getTrace through replay by a local worker. - Worker queryWorker; - if (useExternalService) { - Worker.Factory workerFactory = new Worker.Factory(service, DOMAIN); - queryWorker = workerFactory.newWorker(taskList); - } else { - queryWorker = testEnvironment.newWorker(taskList); - } - queryWorker.registerWorkflowImplementationTypes(TestSignalWorkflowImpl.class); - String queryResult = null; - try { - queryResult = - queryWorker.queryWorkflowExecution( - execution.get(), "QueryableWorkflow::getState", String.class); - } catch (Exception e) { - throw new RuntimeException(e); - } - assertEquals("Hello ", queryResult); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - client.get().mySignal("World!"); - assertEquals("World!", client.get().getState()); - assertEquals( - "Hello World!", - workflowClient - .newUntypedWorkflowStub(execution.get(), Optional.empty()) - .getResult(String.class)); - }); - client.set(workflowClient.newWorkflowStub(QueryableWorkflow.class, optionsBuilder.build())); + QueryableWorkflow client = + workflowClient.newWorkflowStub(QueryableWorkflow.class, optionsBuilder.build()); // To execute workflow client.execute() would do. But we want to start workflow and immediately // return. - execution.set(WorkflowClient.start(client.get()::execute)); + WorkflowExecution execution = WorkflowClient.start(client::execute); + + sleep(Duration.ofSeconds(1)); + assertEquals(workflowId, execution.getWorkflowId()); + // Calls query multiple times to check at the end of the method that if it doesn't leak threads + assertEquals("initial", client.getState()); + sleep(Duration.ofSeconds(1)); + + client.mySignal("Hello "); + sleep(Duration.ofSeconds(1)); + + // Test client created using WorkflowExecution + QueryableWorkflow client2 = + workflowClient.newWorkflowStub( + QueryableWorkflow.class, execution.getWorkflowId(), Optional.of(execution.getRunId())); + assertEquals("Hello ", client2.getState()); + + String queryResult = null; + queryResult = + queryWorker.queryWorkflowExecution(execution, "QueryableWorkflow::getState", String.class); + assertEquals("Hello ", queryResult); + sleep(Duration.ofMillis(500)); + client2.mySignal("World!"); + sleep(Duration.ofMillis(500)); + assertEquals("World!", client2.getState()); + assertEquals( + "Hello World!", + workflowClient.newUntypedWorkflowStub(execution, Optional.empty()).getResult(String.class)); + client2.execute(); + } + + public static class TestNoQueryWorkflowImpl implements QueryableWorkflow { + + CompletablePromise promise = Workflow.newPromise(); + + @Override + public String execute() { + promise.get(); + return "done"; + } + + @Override + public String getState() { + return "some state"; + } + + @Override + public void mySignal(String value) { + promise.complete(null); + } + } + + @Test + public void testNoQueryThreadLeak() { + startWorkerFor(TestNoQueryWorkflowImpl.class); + int threadCount = ManagementFactory.getThreadMXBean().getThreadCount(); + WorkflowOptions.Builder optionsBuilder = newWorkflowOptionsBuilder(taskList); + QueryableWorkflow client = + workflowClient.newWorkflowStub(QueryableWorkflow.class, optionsBuilder.build()); + WorkflowClient.start(client::execute); + sleep(Duration.ofSeconds(1)); + // Calls query multiple times to check at the end of the method that if it doesn't leak threads + int queryCount = 100; + for (int i = 0; i < queryCount; i++) { + assertEquals("some state", client.getState()); + } + client.mySignal("Hello "); + client.execute(); + // Ensures that no threads were leaked due to query + int threadsCreated = ManagementFactory.getThreadMXBean().getThreadCount() - threadCount; + assertTrue("query leaks threads: " + threadsCreated, threadsCreated < queryCount); } @Test @@ -1942,7 +2000,7 @@ public void testSignalUntyped() { } static final AtomicInteger decisionCount = new AtomicInteger(); - static final CompletableFuture sendSignal = new CompletableFuture<>(); + static CompletableFuture sendSignal; public static class TestSignalDuringLastDecisionWorkflowImpl implements TestWorkflowSignaled { @@ -1955,7 +2013,7 @@ public String execute() { // Never sleep in a real workflow using Thread.sleep. // Here it is to simulate a race condition. try { - Thread.sleep(500); + Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -1970,7 +2028,9 @@ public void signal1(String arg) { } @Test - public void testSignalDuringLastDecision() throws InterruptedException { + public void testSignalDuringLastDecision() { + decisionCount.set(0); + sendSignal = new CompletableFuture<>(); startWorkerFor(TestSignalDuringLastDecisionWorkflowImpl.class); WorkflowOptions.Builder options = newWorkflowOptionsBuilder(taskList); options.setWorkflowId("testSignalDuringLastDecision-" + UUID.randomUUID().toString()); @@ -1996,6 +2056,7 @@ public void testSignalDuringLastDecision() throws InterruptedException { .newUntypedWorkflowStub(execution, Optional.empty()) .getResult(String.class)); }); + sleep(Duration.ofSeconds(2)); } public static class TestTimerCallbackBlockedWorkflowImpl implements TestWorkflow1 { @@ -2040,7 +2101,7 @@ public interface ITestNamedChild { String execute(String arg); } - private static String child2Id = UUID.randomUUID().toString(); + private static String child2Id; public static class TestParentWorkflow implements TestWorkflow1 { @@ -2080,6 +2141,7 @@ public String execute(String arg) { @Test public void testChildWorkflow() { + child2Id = UUID.randomUUID().toString(); startWorkerFor(TestParentWorkflow.class, TestNamedChild.class, TestChild.class); WorkflowOptions.Builder options = new WorkflowOptions.Builder(); @@ -3414,11 +3476,7 @@ public void testGenericParametersWorkflow() throws ExecutionException, Interrupt WorkflowClient.execute(workflowStub::execute, taskList, uuidList, uuidSet); // Test signal and query serialization workflowStub.signal(uuidList); - if (useExternalService) { - Thread.sleep(10000); - } else { - testEnvironment.sleep(Duration.ofSeconds(1)); - } + sleep(Duration.ofSeconds(1)); List queryArg = new ArrayList<>(); queryArg.add(UUID.randomUUID()); queryArg.add(UUID.randomUUID()); diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 78a4827d7..b87fe0374 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -25,7 +25,7 @@ - + \ No newline at end of file