diff --git a/src/main/java/com/uber/cadence/internal/metrics/ServiceMethod.java b/src/main/java/com/uber/cadence/internal/metrics/ServiceMethod.java index 71ad3c9ba..55125ef83 100644 --- a/src/main/java/com/uber/cadence/internal/metrics/ServiceMethod.java +++ b/src/main/java/com/uber/cadence/internal/metrics/ServiceMethod.java @@ -22,6 +22,7 @@ public class ServiceMethod { MetricsType.CADENCE_METRICS_PREFIX + "DeprecateDomain"; public static final String DESCRIBE_DOMAIN = MetricsType.CADENCE_METRICS_PREFIX + "DescribeDomain"; + public static final String LIST_DOMAINS = MetricsType.CADENCE_METRICS_PREFIX + "ListDomains"; public static final String GET_WORKFLOW_EXECUTION_HISTORY = MetricsType.CADENCE_METRICS_PREFIX + "GetWorkflowExecutionHistory"; public static final String LIST_CLOSED_WORKFLOW_EXECUTIONS = @@ -72,4 +73,6 @@ public class ServiceMethod { MetricsType.CADENCE_METRICS_PREFIX + "RespondQueryTaskCompleted"; public static final String DESCRIBE_WORKFLOW_EXECUTION = MetricsType.CADENCE_METRICS_PREFIX + "DescribeWorkflowExecution"; + public static final String RESET_STICKY_TASK_LIST = + MetricsType.CADENCE_METRICS_PREFIX + "ResetStickyTaskList"; } diff --git a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java index 8a276705d..df6a05224 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java @@ -29,12 +29,16 @@ import com.uber.cadence.DescribeWorkflowExecutionRequest; import com.uber.cadence.DescribeWorkflowExecutionResponse; import com.uber.cadence.DomainAlreadyExistsError; +import com.uber.cadence.DomainNotActiveError; import com.uber.cadence.EntityNotExistsError; import com.uber.cadence.GetWorkflowExecutionHistoryRequest; import com.uber.cadence.GetWorkflowExecutionHistoryResponse; import com.uber.cadence.InternalServiceError; +import com.uber.cadence.LimitExceededError; import com.uber.cadence.ListClosedWorkflowExecutionsRequest; import com.uber.cadence.ListClosedWorkflowExecutionsResponse; +import com.uber.cadence.ListDomainsRequest; +import com.uber.cadence.ListDomainsResponse; import com.uber.cadence.ListOpenWorkflowExecutionsRequest; import com.uber.cadence.ListOpenWorkflowExecutionsResponse; import com.uber.cadence.PollForActivityTaskRequest; @@ -44,10 +48,13 @@ import com.uber.cadence.QueryFailedError; import com.uber.cadence.QueryWorkflowRequest; import com.uber.cadence.QueryWorkflowResponse; +import com.uber.cadence.RecordActivityTaskHeartbeatByIDRequest; import com.uber.cadence.RecordActivityTaskHeartbeatRequest; import com.uber.cadence.RecordActivityTaskHeartbeatResponse; import com.uber.cadence.RegisterDomainRequest; import com.uber.cadence.RequestCancelWorkflowExecutionRequest; +import com.uber.cadence.ResetStickyTaskListRequest; +import com.uber.cadence.ResetStickyTaskListResponse; import com.uber.cadence.RespondActivityTaskCanceledByIDRequest; import com.uber.cadence.RespondActivityTaskCanceledRequest; import com.uber.cadence.RespondActivityTaskCompletedByIDRequest; @@ -55,9 +62,11 @@ import com.uber.cadence.RespondActivityTaskFailedByIDRequest; import com.uber.cadence.RespondActivityTaskFailedRequest; import com.uber.cadence.RespondDecisionTaskCompletedRequest; +import com.uber.cadence.RespondDecisionTaskCompletedResponse; import com.uber.cadence.RespondDecisionTaskFailedRequest; import com.uber.cadence.RespondQueryTaskCompletedRequest; import com.uber.cadence.ServiceBusyError; +import com.uber.cadence.SignalWithStartWorkflowExecutionRequest; import com.uber.cadence.SignalWorkflowExecutionRequest; import com.uber.cadence.StartWorkflowExecutionRequest; import com.uber.cadence.StartWorkflowExecutionResponse; @@ -379,6 +388,14 @@ public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeat( return impl.RecordActivityTaskHeartbeat(heartbeatRequest); } + @Override + public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeatByID( + RecordActivityTaskHeartbeatByIDRequest heartbeatRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, DomainNotActiveError, + LimitExceededError, ServiceBusyError, TException { + return impl.RecordActivityTaskHeartbeatByID(heartbeatRequest); + } + @Override public void RespondActivityTaskCompleted(RespondActivityTaskCompletedRequest completeRequest) throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { @@ -431,6 +448,15 @@ public void SignalWorkflowExecution(SignalWorkflowExecutionRequest signalRequest impl.SignalWorkflowExecution(signalRequest); } + @Override + public StartWorkflowExecutionResponse SignalWithStartWorkflowExecution( + SignalWithStartWorkflowExecutionRequest signalWithStartRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError, + DomainNotActiveError, LimitExceededError, WorkflowExecutionAlreadyStartedError, + TException { + return impl.SignalWithStartWorkflowExecution(signalWithStartRequest); + } + @Override public void TerminateWorkflowExecution(TerminateWorkflowExecutionRequest terminateRequest) throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError, @@ -460,6 +486,13 @@ public void RespondQueryTaskCompleted(RespondQueryTaskCompletedRequest completeR impl.RespondQueryTaskCompleted(completeRequest); } + @Override + public ResetStickyTaskListResponse ResetStickyTaskList(ResetStickyTaskListRequest resetRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, LimitExceededError, + ServiceBusyError, DomainNotActiveError, TException { + return impl.ResetStickyTaskList(resetRequest); + } + @Override public QueryWorkflowResponse QueryWorkflow(QueryWorkflowRequest queryRequest) throws BadRequestError, InternalServiceError, EntityNotExistsError, QueryFailedError, @@ -494,6 +527,12 @@ public void DescribeDomain( impl.DescribeDomain(describeRequest, resultHandler); } + @Override + public void ListDomains(ListDomainsRequest listRequest, AsyncMethodCallback resultHandler) + throws TException { + impl.ListDomains(listRequest, resultHandler); + } + @Override public void UpdateDomain(UpdateDomainRequest updateRequest, AsyncMethodCallback resultHandler) throws TException { @@ -556,6 +595,13 @@ public void RecordActivityTaskHeartbeat( impl.RecordActivityTaskHeartbeat(heartbeatRequest, resultHandler); } + @Override + public void RecordActivityTaskHeartbeatByID( + RecordActivityTaskHeartbeatByIDRequest heartbeatRequest, AsyncMethodCallback resultHandler) + throws TException { + impl.RecordActivityTaskHeartbeatByID(heartbeatRequest, resultHandler); + } + @Override public void RespondActivityTaskCompleted( RespondActivityTaskCompletedRequest completeRequest, AsyncMethodCallback resultHandler) @@ -612,6 +658,14 @@ public void SignalWorkflowExecution( impl.SignalWorkflowExecution(signalRequest, resultHandler); } + @Override + public void SignalWithStartWorkflowExecution( + SignalWithStartWorkflowExecutionRequest signalWithStartRequest, + AsyncMethodCallback resultHandler) + throws TException { + impl.SignalWithStartWorkflowExecution(signalWithStartRequest, resultHandler); + } + @Override public void TerminateWorkflowExecution( TerminateWorkflowExecutionRequest terminateRequest, AsyncMethodCallback resultHandler) @@ -640,6 +694,13 @@ public void RespondQueryTaskCompleted( impl.RespondQueryTaskCompleted(completeRequest, resultHandler); } + @Override + public void ResetStickyTaskList( + ResetStickyTaskListRequest resetRequest, AsyncMethodCallback resultHandler) + throws TException { + impl.ResetStickyTaskList(resetRequest, resultHandler); + } + @Override public void QueryWorkflow(QueryWorkflowRequest queryRequest, AsyncMethodCallback resultHandler) throws TException { @@ -671,6 +732,13 @@ public DescribeDomainResponse DescribeDomain(DescribeDomainRequest describeReque return impl.DescribeDomain(describeRequest); } + @Override + public ListDomainsResponse ListDomains(ListDomainsRequest listRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError, + TException { + return impl.ListDomains(listRequest); + } + @Override public UpdateDomainResponse UpdateDomain(UpdateDomainRequest updateRequest) throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { @@ -706,9 +774,10 @@ public PollForDecisionTaskResponse PollForDecisionTask(PollForDecisionTaskReques } @Override - public void RespondDecisionTaskCompleted(RespondDecisionTaskCompletedRequest completeRequest) + public RespondDecisionTaskCompletedResponse RespondDecisionTaskCompleted( + RespondDecisionTaskCompletedRequest completeRequest) throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { - impl.RespondDecisionTaskCompleted(completeRequest); + return impl.RespondDecisionTaskCompleted(completeRequest); } @Override diff --git a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java index adfac3aff..f382afa8e 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java @@ -27,12 +27,16 @@ import com.uber.cadence.DescribeWorkflowExecutionRequest; import com.uber.cadence.DescribeWorkflowExecutionResponse; import com.uber.cadence.DomainAlreadyExistsError; +import com.uber.cadence.DomainNotActiveError; import com.uber.cadence.EntityNotExistsError; import com.uber.cadence.GetWorkflowExecutionHistoryRequest; import com.uber.cadence.GetWorkflowExecutionHistoryResponse; import com.uber.cadence.InternalServiceError; +import com.uber.cadence.LimitExceededError; import com.uber.cadence.ListClosedWorkflowExecutionsRequest; import com.uber.cadence.ListClosedWorkflowExecutionsResponse; +import com.uber.cadence.ListDomainsRequest; +import com.uber.cadence.ListDomainsResponse; import com.uber.cadence.ListOpenWorkflowExecutionsRequest; import com.uber.cadence.ListOpenWorkflowExecutionsResponse; import com.uber.cadence.PollForActivityTaskRequest; @@ -42,10 +46,13 @@ import com.uber.cadence.QueryFailedError; import com.uber.cadence.QueryWorkflowRequest; import com.uber.cadence.QueryWorkflowResponse; +import com.uber.cadence.RecordActivityTaskHeartbeatByIDRequest; import com.uber.cadence.RecordActivityTaskHeartbeatRequest; import com.uber.cadence.RecordActivityTaskHeartbeatResponse; import com.uber.cadence.RegisterDomainRequest; import com.uber.cadence.RequestCancelWorkflowExecutionRequest; +import com.uber.cadence.ResetStickyTaskListRequest; +import com.uber.cadence.ResetStickyTaskListResponse; import com.uber.cadence.RespondActivityTaskCanceledByIDRequest; import com.uber.cadence.RespondActivityTaskCanceledRequest; import com.uber.cadence.RespondActivityTaskCompletedByIDRequest; @@ -53,9 +60,11 @@ import com.uber.cadence.RespondActivityTaskFailedByIDRequest; import com.uber.cadence.RespondActivityTaskFailedRequest; import com.uber.cadence.RespondDecisionTaskCompletedRequest; +import com.uber.cadence.RespondDecisionTaskCompletedResponse; import com.uber.cadence.RespondDecisionTaskFailedRequest; import com.uber.cadence.RespondQueryTaskCompletedRequest; import com.uber.cadence.ServiceBusyError; +import com.uber.cadence.SignalWithStartWorkflowExecutionRequest; import com.uber.cadence.SignalWorkflowExecutionRequest; import com.uber.cadence.StartWorkflowExecutionRequest; import com.uber.cadence.StartWorkflowExecutionResponse; @@ -221,6 +230,14 @@ public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeat( return impl.RecordActivityTaskHeartbeat(heartbeatRequest); } + @Override + public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeatByID( + RecordActivityTaskHeartbeatByIDRequest heartbeatRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, DomainNotActiveError, + LimitExceededError, ServiceBusyError, TException { + return impl.RecordActivityTaskHeartbeatByID(heartbeatRequest); + } + @Override public void RespondActivityTaskCompleted(RespondActivityTaskCompletedRequest completeRequest) throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { @@ -273,6 +290,15 @@ public void SignalWorkflowExecution(SignalWorkflowExecutionRequest signalRequest impl.SignalWorkflowExecution(signalRequest); } + @Override + public StartWorkflowExecutionResponse SignalWithStartWorkflowExecution( + SignalWithStartWorkflowExecutionRequest signalWithStartRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError, + DomainNotActiveError, LimitExceededError, WorkflowExecutionAlreadyStartedError, + TException { + return impl.SignalWithStartWorkflowExecution(signalWithStartRequest); + } + @Override public void TerminateWorkflowExecution(TerminateWorkflowExecutionRequest terminateRequest) throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError, @@ -302,6 +328,13 @@ public void RespondQueryTaskCompleted(RespondQueryTaskCompletedRequest completeR impl.RespondQueryTaskCompleted(completeRequest); } + @Override + public ResetStickyTaskListResponse ResetStickyTaskList(ResetStickyTaskListRequest resetRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, LimitExceededError, + ServiceBusyError, DomainNotActiveError, TException { + return impl.ResetStickyTaskList(resetRequest); + } + @Override public QueryWorkflowResponse QueryWorkflow(QueryWorkflowRequest queryRequest) throws BadRequestError, InternalServiceError, EntityNotExistsError, QueryFailedError, @@ -336,6 +369,12 @@ public void DescribeDomain( impl.DescribeDomain(describeRequest, resultHandler); } + @Override + public void ListDomains(ListDomainsRequest listRequest, AsyncMethodCallback resultHandler) + throws TException { + impl.ListDomains(listRequest, resultHandler); + } + @Override public void UpdateDomain(UpdateDomainRequest updateRequest, AsyncMethodCallback resultHandler) throws TException { @@ -398,6 +437,13 @@ public void RecordActivityTaskHeartbeat( impl.RecordActivityTaskHeartbeat(heartbeatRequest, resultHandler); } + @Override + public void RecordActivityTaskHeartbeatByID( + RecordActivityTaskHeartbeatByIDRequest heartbeatRequest, AsyncMethodCallback resultHandler) + throws TException { + impl.RecordActivityTaskHeartbeatByID(heartbeatRequest, resultHandler); + } + @Override public void RespondActivityTaskCompleted( RespondActivityTaskCompletedRequest completeRequest, AsyncMethodCallback resultHandler) @@ -454,6 +500,14 @@ public void SignalWorkflowExecution( impl.SignalWorkflowExecution(signalRequest, resultHandler); } + @Override + public void SignalWithStartWorkflowExecution( + SignalWithStartWorkflowExecutionRequest signalWithStartRequest, + AsyncMethodCallback resultHandler) + throws TException { + impl.SignalWithStartWorkflowExecution(signalWithStartRequest, resultHandler); + } + @Override public void TerminateWorkflowExecution( TerminateWorkflowExecutionRequest terminateRequest, AsyncMethodCallback resultHandler) @@ -482,6 +536,13 @@ public void RespondQueryTaskCompleted( impl.RespondQueryTaskCompleted(completeRequest, resultHandler); } + @Override + public void ResetStickyTaskList( + ResetStickyTaskListRequest resetRequest, AsyncMethodCallback resultHandler) + throws TException { + impl.ResetStickyTaskList(resetRequest, resultHandler); + } + @Override public void QueryWorkflow(QueryWorkflowRequest queryRequest, AsyncMethodCallback resultHandler) throws TException { @@ -513,6 +574,13 @@ public DescribeDomainResponse DescribeDomain(DescribeDomainRequest describeReque return impl.DescribeDomain(describeRequest); } + @Override + public ListDomainsResponse ListDomains(ListDomainsRequest listRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError, + TException { + return impl.ListDomains(listRequest); + } + @Override public UpdateDomainResponse UpdateDomain(UpdateDomainRequest updateRequest) throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { @@ -548,9 +616,10 @@ public PollForDecisionTaskResponse PollForDecisionTask(PollForDecisionTaskReques } @Override - public void RespondDecisionTaskCompleted(RespondDecisionTaskCompletedRequest completeRequest) + public RespondDecisionTaskCompletedResponse RespondDecisionTaskCompleted( + RespondDecisionTaskCompletedRequest completeRequest) throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { - impl.RespondDecisionTaskCompleted(completeRequest); + return impl.RespondDecisionTaskCompleted(completeRequest); } @Override diff --git a/src/main/java/com/uber/cadence/internal/testservice/StateMachines.java b/src/main/java/com/uber/cadence/internal/testservice/StateMachines.java index fbbd33486..6589c9cc7 100644 --- a/src/main/java/com/uber/cadence/internal/testservice/StateMachines.java +++ b/src/main/java/com/uber/cadence/internal/testservice/StateMachines.java @@ -71,7 +71,6 @@ import com.uber.cadence.PollForActivityTaskResponse; import com.uber.cadence.PollForDecisionTaskRequest; import com.uber.cadence.PollForDecisionTaskResponse; -import com.uber.cadence.RecordActivityTaskHeartbeatRequest; import com.uber.cadence.RequestCancelActivityTaskDecisionAttributes; import com.uber.cadence.RequestCancelWorkflowExecutionRequest; import com.uber.cadence.RespondActivityTaskCanceledByIDRequest; @@ -934,11 +933,8 @@ private static void reportActivityTaskCancellation( } private static void heartbeatActivityTask( - RequestContext nullCtx, - ActivityTaskData data, - RecordActivityTaskHeartbeatRequest request, - long notUsed) { - data.heartbeatDetails = request.getDetails(); + RequestContext nullCtx, ActivityTaskData data, byte[] details, long notUsed) { + data.heartbeatDetails = details; } private static void startTimer( diff --git a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableState.java b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableState.java index e63b84554..69c8de0c1 100644 --- a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableState.java +++ b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableState.java @@ -31,7 +31,6 @@ import com.uber.cadence.PollForDecisionTaskResponse; import com.uber.cadence.QueryWorkflowRequest; import com.uber.cadence.QueryWorkflowResponse; -import com.uber.cadence.RecordActivityTaskHeartbeatRequest; import com.uber.cadence.RecordActivityTaskHeartbeatResponse; import com.uber.cadence.RequestCancelWorkflowExecutionRequest; import com.uber.cadence.RespondActivityTaskCanceledByIDRequest; @@ -114,8 +113,7 @@ void failActivityTask(String activityId, RespondActivityTaskFailedRequest reques void failActivityTaskById(String id, RespondActivityTaskFailedByIDRequest failRequest) throws EntityNotExistsError, InternalServiceError, BadRequestError; - RecordActivityTaskHeartbeatResponse heartbeatActivityTask( - String activityId, RecordActivityTaskHeartbeatRequest request) + RecordActivityTaskHeartbeatResponse heartbeatActivityTask(String activityId, byte[] details) throws InternalServiceError, EntityNotExistsError, BadRequestError; void signal(SignalWorkflowExecutionRequest signalRequest) diff --git a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java index 280bfb6b1..9ffe67ed3 100644 --- a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java @@ -45,7 +45,6 @@ import com.uber.cadence.QueryTaskCompletedType; import com.uber.cadence.QueryWorkflowRequest; import com.uber.cadence.QueryWorkflowResponse; -import com.uber.cadence.RecordActivityTaskHeartbeatRequest; import com.uber.cadence.RecordActivityTaskHeartbeatResponse; import com.uber.cadence.RecordMarkerDecisionAttributes; import com.uber.cadence.RequestCancelActivityTaskDecisionAttributes; @@ -1038,14 +1037,13 @@ public void cancelActivityTaskById( @Override public RecordActivityTaskHeartbeatResponse heartbeatActivityTask( - String activityId, RecordActivityTaskHeartbeatRequest request) - throws InternalServiceError, EntityNotExistsError { + String activityId, byte[] details) throws InternalServiceError, EntityNotExistsError { RecordActivityTaskHeartbeatResponse result = new RecordActivityTaskHeartbeatResponse(); try { update( ctx -> { StateMachine activity = getActivity(activityId); - activity.action(StateMachines.Action.UPDATE, ctx, request, 0); + activity.action(StateMachines.Action.UPDATE, ctx, details, 0); if (activity.getState() == StateMachines.State.CANCELLATION_REQUESTED) { result.setCancelRequested(true); } diff --git a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java index 779102367..b8536bd16 100644 --- a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java +++ b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java @@ -27,12 +27,16 @@ import com.uber.cadence.DescribeWorkflowExecutionRequest; import com.uber.cadence.DescribeWorkflowExecutionResponse; import com.uber.cadence.DomainAlreadyExistsError; +import com.uber.cadence.DomainNotActiveError; import com.uber.cadence.EntityNotExistsError; import com.uber.cadence.GetWorkflowExecutionHistoryRequest; import com.uber.cadence.GetWorkflowExecutionHistoryResponse; import com.uber.cadence.InternalServiceError; +import com.uber.cadence.LimitExceededError; import com.uber.cadence.ListClosedWorkflowExecutionsRequest; import com.uber.cadence.ListClosedWorkflowExecutionsResponse; +import com.uber.cadence.ListDomainsRequest; +import com.uber.cadence.ListDomainsResponse; import com.uber.cadence.ListOpenWorkflowExecutionsRequest; import com.uber.cadence.ListOpenWorkflowExecutionsResponse; import com.uber.cadence.PollForActivityTaskRequest; @@ -42,10 +46,13 @@ import com.uber.cadence.QueryFailedError; import com.uber.cadence.QueryWorkflowRequest; import com.uber.cadence.QueryWorkflowResponse; +import com.uber.cadence.RecordActivityTaskHeartbeatByIDRequest; import com.uber.cadence.RecordActivityTaskHeartbeatRequest; import com.uber.cadence.RecordActivityTaskHeartbeatResponse; import com.uber.cadence.RegisterDomainRequest; import com.uber.cadence.RequestCancelWorkflowExecutionRequest; +import com.uber.cadence.ResetStickyTaskListRequest; +import com.uber.cadence.ResetStickyTaskListResponse; import com.uber.cadence.RespondActivityTaskCanceledByIDRequest; import com.uber.cadence.RespondActivityTaskCanceledRequest; import com.uber.cadence.RespondActivityTaskCompletedByIDRequest; @@ -53,11 +60,13 @@ import com.uber.cadence.RespondActivityTaskFailedByIDRequest; import com.uber.cadence.RespondActivityTaskFailedRequest; import com.uber.cadence.RespondDecisionTaskCompletedRequest; +import com.uber.cadence.RespondDecisionTaskCompletedResponse; import com.uber.cadence.RespondDecisionTaskFailedRequest; import com.uber.cadence.RespondQueryTaskCompletedRequest; import com.uber.cadence.ServiceBusyError; import com.uber.cadence.SignalExternalWorkflowExecutionDecisionAttributes; import com.uber.cadence.SignalExternalWorkflowExecutionFailedCause; +import com.uber.cadence.SignalWithStartWorkflowExecutionRequest; import com.uber.cadence.SignalWorkflowExecutionRequest; import com.uber.cadence.StartWorkflowExecutionRequest; import com.uber.cadence.StartWorkflowExecutionResponse; @@ -156,6 +165,13 @@ public DescribeDomainResponse DescribeDomain(DescribeDomainRequest describeReque throw new UnsupportedOperationException("not implemented"); } + @Override + public ListDomainsResponse ListDomains(ListDomainsRequest listRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError, + TException { + throw new UnsupportedOperationException("not implemented"); + } + @Override public UpdateDomainResponse UpdateDomain(UpdateDomainRequest updateRequest) throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { @@ -282,11 +298,13 @@ public PollForDecisionTaskResponse PollForDecisionTask(PollForDecisionTaskReques } @Override - public void RespondDecisionTaskCompleted(RespondDecisionTaskCompletedRequest request) + public RespondDecisionTaskCompletedResponse RespondDecisionTaskCompleted( + RespondDecisionTaskCompletedRequest request) throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { DecisionTaskToken taskToken = DecisionTaskToken.fromBytes(request.getTaskToken()); TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId()); mutableState.completeDecisionTask(taskToken.getHistorySize(), request); + return new RespondDecisionTaskCompletedResponse(); } @Override @@ -327,7 +345,22 @@ public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeat( throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { ActivityId activityId = ActivityId.fromBytes(heartbeatRequest.getTaskToken()); TestWorkflowMutableState mutableState = getMutableState(activityId.getExecutionId()); - return mutableState.heartbeatActivityTask(activityId.getId(), heartbeatRequest); + return mutableState.heartbeatActivityTask(activityId.getId(), heartbeatRequest.getDetails()); + } + + @Override + public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeatByID( + RecordActivityTaskHeartbeatByIDRequest heartbeatRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, DomainNotActiveError, + LimitExceededError, ServiceBusyError, TException { + ExecutionId execution = + new ExecutionId( + heartbeatRequest.getDomain(), + heartbeatRequest.getWorkflowID(), + heartbeatRequest.getRunID()); + TestWorkflowMutableState mutableState = getMutableState(execution); + return mutableState.heartbeatActivityTask( + heartbeatRequest.getActivityID(), heartbeatRequest.getDetails()); } @Override @@ -415,6 +448,15 @@ public void SignalWorkflowExecution(SignalWorkflowExecutionRequest signalRequest mutableState.signal(signalRequest); } + @Override + public StartWorkflowExecutionResponse SignalWithStartWorkflowExecution( + SignalWithStartWorkflowExecutionRequest signalWithStartRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError, + DomainNotActiveError, LimitExceededError, WorkflowExecutionAlreadyStartedError, + TException { + throw new UnsupportedOperationException("not implemented"); + } + public void signalExternalWorkflowExecution( String signalId, SignalExternalWorkflowExecutionDecisionAttributes a, @@ -522,6 +564,13 @@ public void RespondQueryTaskCompleted(RespondQueryTaskCompletedRequest completeR mutableState.completeQuery(queryId, completeRequest); } + @Override + public ResetStickyTaskListResponse ResetStickyTaskList(ResetStickyTaskListRequest resetRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, LimitExceededError, + ServiceBusyError, DomainNotActiveError, TException { + throw new UnsupportedOperationException("not implemented"); + } + @Override public QueryWorkflowResponse QueryWorkflow(QueryWorkflowRequest queryRequest) throws BadRequestError, InternalServiceError, EntityNotExistsError, QueryFailedError, @@ -557,6 +606,12 @@ public void DescribeDomain( throw new UnsupportedOperationException("not implemented"); } + @Override + public void ListDomains(ListDomainsRequest listRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new UnsupportedOperationException("not implemented"); + } + @Override public void UpdateDomain(UpdateDomainRequest updateRequest, AsyncMethodCallback resultHandler) throws TException { @@ -626,6 +681,13 @@ public void RecordActivityTaskHeartbeat( throw new UnsupportedOperationException("not implemented"); } + @Override + public void RecordActivityTaskHeartbeatByID( + RecordActivityTaskHeartbeatByIDRequest heartbeatRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new UnsupportedOperationException("not implemented"); + } + @Override public void RespondActivityTaskCompleted( RespondActivityTaskCompletedRequest completeRequest, AsyncMethodCallback resultHandler) @@ -682,6 +744,14 @@ public void SignalWorkflowExecution( throw new UnsupportedOperationException("not implemented"); } + @Override + public void SignalWithStartWorkflowExecution( + SignalWithStartWorkflowExecutionRequest signalWithStartRequest, + AsyncMethodCallback resultHandler) + throws TException { + throw new UnsupportedOperationException("not implemented"); + } + @Override public void TerminateWorkflowExecution( TerminateWorkflowExecutionRequest terminateRequest, AsyncMethodCallback resultHandler) @@ -710,6 +780,13 @@ public void RespondQueryTaskCompleted( throw new UnsupportedOperationException("not implemented"); } + @Override + public void ResetStickyTaskList( + ResetStickyTaskListRequest resetRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new UnsupportedOperationException("not implemented"); + } + @Override public void QueryWorkflow(QueryWorkflowRequest queryRequest, AsyncMethodCallback resultHandler) throws TException { diff --git a/src/main/java/com/uber/cadence/internal/worker/PollDecisionTaskDispatcherFactory.java b/src/main/java/com/uber/cadence/internal/worker/PollDecisionTaskDispatcherFactory.java index 62208d1b2..114bbb57a 100644 --- a/src/main/java/com/uber/cadence/internal/worker/PollDecisionTaskDispatcherFactory.java +++ b/src/main/java/com/uber/cadence/internal/worker/PollDecisionTaskDispatcherFactory.java @@ -19,7 +19,6 @@ import com.uber.cadence.PollForDecisionTaskResponse; import com.uber.cadence.serviceclient.IWorkflowService; -import com.uber.cadence.serviceclient.WorkflowServiceTChannel; import java.util.Objects; public final class PollDecisionTaskDispatcherFactory diff --git a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java index 693511a61..76aef9c73 100644 --- a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java +++ b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java @@ -27,11 +27,16 @@ import com.uber.cadence.DescribeWorkflowExecutionRequest; import com.uber.cadence.DescribeWorkflowExecutionResponse; import com.uber.cadence.DomainAlreadyExistsError; +import com.uber.cadence.DomainNotActiveError; import com.uber.cadence.EntityNotExistsError; import com.uber.cadence.GetWorkflowExecutionHistoryRequest; import com.uber.cadence.GetWorkflowExecutionHistoryResponse; +import com.uber.cadence.InternalServiceError; +import com.uber.cadence.LimitExceededError; import com.uber.cadence.ListClosedWorkflowExecutionsRequest; import com.uber.cadence.ListClosedWorkflowExecutionsResponse; +import com.uber.cadence.ListDomainsRequest; +import com.uber.cadence.ListDomainsResponse; import com.uber.cadence.ListOpenWorkflowExecutionsRequest; import com.uber.cadence.ListOpenWorkflowExecutionsResponse; import com.uber.cadence.PollForActivityTaskRequest; @@ -41,10 +46,13 @@ import com.uber.cadence.QueryFailedError; import com.uber.cadence.QueryWorkflowRequest; import com.uber.cadence.QueryWorkflowResponse; +import com.uber.cadence.RecordActivityTaskHeartbeatByIDRequest; import com.uber.cadence.RecordActivityTaskHeartbeatRequest; import com.uber.cadence.RecordActivityTaskHeartbeatResponse; import com.uber.cadence.RegisterDomainRequest; import com.uber.cadence.RequestCancelWorkflowExecutionRequest; +import com.uber.cadence.ResetStickyTaskListRequest; +import com.uber.cadence.ResetStickyTaskListResponse; import com.uber.cadence.RespondActivityTaskCanceledByIDRequest; import com.uber.cadence.RespondActivityTaskCanceledRequest; import com.uber.cadence.RespondActivityTaskCompletedByIDRequest; @@ -52,8 +60,11 @@ import com.uber.cadence.RespondActivityTaskFailedByIDRequest; import com.uber.cadence.RespondActivityTaskFailedRequest; import com.uber.cadence.RespondDecisionTaskCompletedRequest; +import com.uber.cadence.RespondDecisionTaskCompletedResponse; import com.uber.cadence.RespondDecisionTaskFailedRequest; import com.uber.cadence.RespondQueryTaskCompletedRequest; +import com.uber.cadence.ServiceBusyError; +import com.uber.cadence.SignalWithStartWorkflowExecutionRequest; import com.uber.cadence.SignalWorkflowExecutionRequest; import com.uber.cadence.StartWorkflowExecutionRequest; import com.uber.cadence.StartWorkflowExecutionResponse; @@ -525,6 +536,9 @@ private void registerDomain(RegisterDomainRequest registerRequest) throws TExcep if (result.isSetDomainExistsError()) { throw result.getDomainExistsError(); } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } throw new TException("RegisterDomain failed with unknown error:" + result); } finally { if (response != null) { @@ -561,6 +575,9 @@ private DescribeDomainResponse describeDomain(DescribeDomainRequest describeRequ if (result.isSetEntityNotExistError()) { throw result.getEntityNotExistError(); } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } throw new TException("DescribeDomain failed with unknown error:" + result); } finally { if (response != null) { @@ -569,6 +586,44 @@ private DescribeDomainResponse describeDomain(DescribeDomainRequest describeRequ } } + @Override + public ListDomainsResponse ListDomains(ListDomainsRequest listRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError, + TException { + return measureRemoteCall(ServiceMethod.LIST_DOMAINS, () -> listDomains(listRequest)); + } + + private ListDomainsResponse listDomains(ListDomainsRequest describeRequest) throws TException { + ThriftResponse response = null; + try { + ThriftRequest request = + buildThriftRequest("ListDomains", new WorkflowService.ListDomains_args(describeRequest)); + response = doRemoteCall(request); + WorkflowService.ListDomains_result result = + response.getBody(WorkflowService.ListDomains_result.class); + if (response.getResponseCode() == ResponseCode.OK) { + return result.getSuccess(); + } + if (result.isSetBadRequestError()) { + throw result.getBadRequestError(); + } + if (result.isSetInternalServiceError()) { + throw result.getInternalServiceError(); + } + if (result.isSetEntityNotExistError()) { + throw result.getEntityNotExistError(); + } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + throw new TException("ListDomains failed with unknown error:" + result); + } finally { + if (response != null) { + response.release(); + } + } + } + @Override public UpdateDomainResponse UpdateDomain(UpdateDomainRequest updateRequest) throws TException { return measureRemoteCall(ServiceMethod.UPDATE_DOMAIN, () -> updateDomain(updateRequest)); @@ -594,6 +649,12 @@ private UpdateDomainResponse updateDomain(UpdateDomainRequest updateRequest) thr if (result.isSetEntityNotExistError()) { throw result.getEntityNotExistError(); } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } throw new TException("UpdateDomain failed with unknown error:" + result); } finally { if (response != null) { @@ -628,6 +689,12 @@ private void deprecateDomain(DeprecateDomainRequest deprecateRequest) throws TEx if (result.isSetEntityNotExistError()) { throw result.getEntityNotExistError(); } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } throw new TException("DeprecateDomain failed with unknown error:" + result); } finally { if (response != null) { @@ -670,6 +737,12 @@ private StartWorkflowExecutionResponse startWorkflowExecution( if (result.isSetServiceBusyError()) { throw result.getServiceBusyError(); } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("StartWorkflowExecution failed with unknown error:" + result); } finally { if (response != null) { @@ -751,6 +824,12 @@ private PollForDecisionTaskResponse pollForDecisionTask(PollForDecisionTaskReque if (result.isSetServiceBusyError()) { throw result.getServiceBusyError(); } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("PollForDecisionTask failed with unknown error:" + result); } finally { if (response != null) { @@ -760,25 +839,27 @@ private PollForDecisionTaskResponse pollForDecisionTask(PollForDecisionTaskReque } @Override - public void RespondDecisionTaskCompleted(RespondDecisionTaskCompletedRequest request) - throws TException { - measureRemoteProc( - ServiceMethod.RESPOND_DECISION_TASK_COMPLETED, () -> respondDecisionTaskCompleted(request)); + public RespondDecisionTaskCompletedResponse RespondDecisionTaskCompleted( + RespondDecisionTaskCompletedRequest completedRequest) throws TException { + return measureRemoteCall( + ServiceMethod.RESPOND_DECISION_TASK_COMPLETED, + () -> respondDecisionTaskCompleted(completedRequest)); } - private void respondDecisionTaskCompleted(RespondDecisionTaskCompletedRequest completeRequest) - throws TException { + private RespondDecisionTaskCompletedResponse respondDecisionTaskCompleted( + RespondDecisionTaskCompletedRequest completedRequest) throws TException { ThriftResponse response = null; try { ThriftRequest request = buildThriftRequest( "RespondDecisionTaskCompleted", - new WorkflowService.RespondDecisionTaskCompleted_args(completeRequest)); + new WorkflowService.RespondDecisionTaskCompleted_args(completedRequest), + options.getRpcLongPollTimeoutMillis()); response = doRemoteCall(request); WorkflowService.RespondDecisionTaskCompleted_result result = response.getBody(WorkflowService.RespondDecisionTaskCompleted_result.class); if (response.getResponseCode() == ResponseCode.OK) { - return; + return result.getSuccess(); } if (result.isSetBadRequestError()) { throw result.getBadRequestError(); @@ -786,8 +867,14 @@ private void respondDecisionTaskCompleted(RespondDecisionTaskCompletedRequest co if (result.isSetInternalServiceError()) { throw result.getInternalServiceError(); } - if (result.isSetEntityNotExistError()) { - throw result.getEntityNotExistError(); + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); } throw new TException("RespondDecisionTaskCompleted failed with unknown error:" + result); } finally { @@ -827,6 +914,15 @@ private void respondDecisionTaskFailed(RespondDecisionTaskFailedRequest failedRe if (result.isSetEntityNotExistError()) { throw result.getEntityNotExistError(); } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("RespondDecisionTaskFailed failed with unknown error:" + result); } finally { if (response != null) { @@ -866,6 +962,12 @@ private PollForActivityTaskResponse pollForActivityTask(PollForActivityTaskReque if (result.isSetServiceBusyError()) { throw result.getServiceBusyError(); } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("PollForActivityTask failed with unknown error:" + result); } finally { if (response != null) { @@ -904,6 +1006,15 @@ private RecordActivityTaskHeartbeatResponse recordActivityTaskHeartbeat( if (result.isSetEntityNotExistError()) { throw result.getEntityNotExistError(); } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("RecordActivityTaskHeartbeat failed with unknown error:" + result); } finally { if (response != null) { @@ -912,6 +1023,56 @@ private RecordActivityTaskHeartbeatResponse recordActivityTaskHeartbeat( } } + @Override + public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeatByID( + RecordActivityTaskHeartbeatByIDRequest heartbeatRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, DomainNotActiveError, + LimitExceededError, ServiceBusyError, TException { + return measureRemoteCall( + ServiceMethod.RECORD_ACTIVITY_TASK_HEARTBEAT_BY_ID, + () -> recordActivityTaskHeartbeatByID(heartbeatRequest)); + } + + private RecordActivityTaskHeartbeatResponse recordActivityTaskHeartbeatByID( + RecordActivityTaskHeartbeatByIDRequest heartbeatRequest) throws TException { + ThriftResponse response = null; + try { + ThriftRequest request = + buildThriftRequest( + "RecordActivityTaskHeartbeatByID", + new WorkflowService.RecordActivityTaskHeartbeatByID_args(heartbeatRequest)); + response = doRemoteCall(request); + WorkflowService.RecordActivityTaskHeartbeatByID_result result = + response.getBody(WorkflowService.RecordActivityTaskHeartbeatByID_result.class); + if (response.getResponseCode() == ResponseCode.OK) { + return result.getSuccess(); + } + if (result.isSetBadRequestError()) { + throw result.getBadRequestError(); + } + if (result.isSetInternalServiceError()) { + throw result.getInternalServiceError(); + } + if (result.isSetEntityNotExistError()) { + throw result.getEntityNotExistError(); + } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } + throw new TException("RecordActivityTaskHeartbeatByID failed with unknown error:" + result); + } finally { + if (response != null) { + response.release(); + } + } + } + @Override public void RespondActivityTaskCompleted(RespondActivityTaskCompletedRequest request) throws TException { @@ -942,6 +1103,15 @@ private void respondActivityTaskCompleted(RespondActivityTaskCompletedRequest co if (result.isSetEntityNotExistError()) { throw result.getEntityNotExistError(); } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("RespondActivityTaskCompleted failed with unknown error:" + result); } finally { if (response != null) { @@ -981,6 +1151,15 @@ private void respondActivityTaskCompletedByID( if (result.isSetEntityNotExistError()) { throw result.getEntityNotExistError(); } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("RespondActivityTaskCompletedByID failed with unknown error:" + result); } finally { if (response != null) { @@ -1019,6 +1198,15 @@ private void respondActivityTaskFailed(RespondActivityTaskFailedRequest failRequ if (result.isSetEntityNotExistError()) { throw result.getEntityNotExistError(); } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("RespondActivityTaskFailed failed with unknown error:" + result); } finally { if (response != null) { @@ -1058,6 +1246,15 @@ private void respondActivityTaskFailedByID(RespondActivityTaskFailedByIDRequest if (result.isSetEntityNotExistError()) { throw result.getEntityNotExistError(); } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("RespondActivityTaskFailedByID failedByID with unknown error:" + result); } finally { if (response != null) { @@ -1096,6 +1293,15 @@ private void respondActivityTaskCanceled(RespondActivityTaskCanceledRequest canc if (result.isSetEntityNotExistError()) { throw result.getEntityNotExistError(); } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("RespondActivityTaskCanceled failed with unknown error:" + result); } finally { if (response != null) { @@ -1135,6 +1341,15 @@ private void respondActivityTaskCanceledByID( if (result.isSetEntityNotExistError()) { throw result.getEntityNotExistError(); } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("RespondActivityTaskCanceledByID failed with unknown error:" + result); } finally { if (response != null) { @@ -1181,6 +1396,12 @@ private void requestCancelWorkflowExecution(RequestCancelWorkflowExecutionReques if (result.isSetServiceBusyError()) { throw result.getServiceBusyError(); } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("RequestCancelWorkflowExecution failed with unknown error:" + result); } finally { if (response != null) { @@ -1221,6 +1442,12 @@ private void signalWorkflowExecution(SignalWorkflowExecutionRequest signalReques if (result.isSetServiceBusyError()) { throw result.getServiceBusyError(); } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("SignalWorkflowExecution failed with unknown error:" + result); } finally { if (response != null) { @@ -1229,6 +1456,61 @@ private void signalWorkflowExecution(SignalWorkflowExecutionRequest signalReques } } + @Override + public StartWorkflowExecutionResponse SignalWithStartWorkflowExecution( + SignalWithStartWorkflowExecutionRequest signalWithStartRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError, + DomainNotActiveError, LimitExceededError, WorkflowExecutionAlreadyStartedError, + TException { + return measureRemoteCall( + ServiceMethod.SIGNAL_WITH_START_WORKFLOW_EXECUTION, + () -> signalWithStartWorkflowExecution(signalWithStartRequest)); + } + + private StartWorkflowExecutionResponse signalWithStartWorkflowExecution( + SignalWithStartWorkflowExecutionRequest signalWithStartRequest) throws TException { + ThriftResponse response = null; + try { + ThriftRequest request = + buildThriftRequest( + "SignalWithStartWorkflowExecution", + new WorkflowService.SignalWithStartWorkflowExecution_args(signalWithStartRequest), + options.getRpcLongPollTimeoutMillis()); + response = doRemoteCall(request); + WorkflowService.SignalWithStartWorkflowExecution_result result = + response.getBody(WorkflowService.SignalWithStartWorkflowExecution_result.class); + if (response.getResponseCode() == ResponseCode.OK) { + return result.getSuccess(); + } + if (result.isSetBadRequestError()) { + throw result.getBadRequestError(); + } + if (result.isSetInternalServiceError()) { + throw result.getInternalServiceError(); + } + if (result.isSetEntityNotExistError()) { + throw result.getEntityNotExistError(); + } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + throw new TException("SignalWithStartWorkflowExecution failed with unknown error:" + result); + } finally { + if (response != null) { + response.release(); + } + } + } + @Override public void TerminateWorkflowExecution(TerminateWorkflowExecutionRequest request) throws TException { @@ -1262,6 +1544,12 @@ private void terminateWorkflowExecution(TerminateWorkflowExecutionRequest termin if (result.isSetServiceBusyError()) { throw result.getServiceBusyError(); } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("TerminateWorkflowExecution failed with unknown error:" + result); } finally { if (response != null) { @@ -1303,6 +1591,9 @@ private ListOpenWorkflowExecutionsResponse listOpenWorkflowExecutions( if (result.isSetServiceBusyError()) { throw result.getServiceBusyError(); } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("ListOpenWorkflowExecutions failed with unknown error:" + result); } finally { if (response != null) { @@ -1382,6 +1673,15 @@ private void respondQueryTaskCompleted(RespondQueryTaskCompletedRequest complete if (result.isSetEntityNotExistError()) { throw result.getEntityNotExistError(); } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("RespondQueryTaskCompleted failed with unknown error:" + result); } finally { if (response != null) { @@ -1430,6 +1730,55 @@ private QueryWorkflowResponse queryWorkflow(QueryWorkflowRequest queryRequest) t } } + @Override + public ResetStickyTaskListResponse ResetStickyTaskList(ResetStickyTaskListRequest resetRequest) + throws BadRequestError, InternalServiceError, EntityNotExistsError, LimitExceededError, + ServiceBusyError, DomainNotActiveError, TException { + return measureRemoteCall( + ServiceMethod.RESET_STICKY_TASK_LIST, () -> resetStickyTaskList(resetRequest)); + } + + private ResetStickyTaskListResponse resetStickyTaskList(ResetStickyTaskListRequest queryRequest) + throws TException { + ThriftResponse response = null; + try { + ThriftRequest request = + buildThriftRequest( + "ResetStickyTaskList", + new WorkflowService.ResetStickyTaskList_args(queryRequest), + options.getRpcQueryTimeoutMillis()); + response = doRemoteCall(request); + WorkflowService.ResetStickyTaskList_result result = + response.getBody(WorkflowService.ResetStickyTaskList_result.class); + if (response.getResponseCode() == ResponseCode.OK) { + return result.getSuccess(); + } + if (result.isSetBadRequestError()) { + throw result.getBadRequestError(); + } + if (result.isSetInternalServiceError()) { + throw result.getInternalServiceError(); + } + if (result.isSetEntityNotExistError()) { + throw result.getEntityNotExistError(); + } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetDomainNotActiveError()) { + throw result.getDomainNotActiveError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } + throw new TException("ResetStickyTaskList failed with unknown error:" + result); + } finally { + if (response != null) { + response.release(); + } + } + } + @Override public DescribeWorkflowExecutionResponse DescribeWorkflowExecution( DescribeWorkflowExecutionRequest request) throws TException { @@ -1460,6 +1809,12 @@ private DescribeWorkflowExecutionResponse describeWorkflowExecution( if (result.isSetEntityNotExistError()) { throw result.getEntityNotExistError(); } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("DescribeWorkflowExecution failed with unknown error:" + result); } finally { if (response != null) { @@ -1496,6 +1851,12 @@ private DescribeTaskListResponse describeTaskList(DescribeTaskListRequest descri if (result.isSetEntityNotExistError()) { throw result.getEntityNotExistError(); } + if (result.isSetServiceBusyError()) { + throw result.getServiceBusyError(); + } + if (result.isSetLimitExceededError()) { + throw result.getLimitExceededError(); + } throw new TException("DescribeTaskList failed with unknown error:" + result); } finally { if (response != null) { @@ -1504,31 +1865,6 @@ private DescribeTaskListResponse describeTaskList(DescribeTaskListRequest descri } } - @Override - public void RegisterDomain( - RegisterDomainRequest registerRequest, AsyncMethodCallback resultHandler) throws TException { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public void DescribeDomain( - DescribeDomainRequest describeRequest, AsyncMethodCallback resultHandler) throws TException { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public void UpdateDomain(UpdateDomainRequest updateRequest, AsyncMethodCallback resultHandler) - throws TException { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public void DeprecateDomain( - DeprecateDomainRequest deprecateRequest, AsyncMethodCallback resultHandler) - throws TException { - throw new UnsupportedOperationException("not implemented"); - } - @Override public void StartWorkflowExecution( StartWorkflowExecutionRequest startRequest, AsyncMethodCallback resultHandler) @@ -1629,6 +1965,13 @@ public void RecordActivityTaskHeartbeat( throw new UnsupportedOperationException("not implemented"); } + @Override + public void RecordActivityTaskHeartbeatByID( + RecordActivityTaskHeartbeatByIDRequest heartbeatRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new UnsupportedOperationException("not implemented"); + } + @Override public void RespondActivityTaskCompleted( RespondActivityTaskCompletedRequest completeRequest, AsyncMethodCallback resultHandler) @@ -1685,6 +2028,14 @@ public void SignalWorkflowExecution( throw new UnsupportedOperationException("not implemented"); } + @Override + public void SignalWithStartWorkflowExecution( + SignalWithStartWorkflowExecutionRequest signalWithStartRequest, + AsyncMethodCallback resultHandler) + throws TException { + throw new UnsupportedOperationException("not implemented"); + } + @Override public void TerminateWorkflowExecution( TerminateWorkflowExecutionRequest terminateRequest, AsyncMethodCallback resultHandler) @@ -1713,6 +2064,13 @@ public void RespondQueryTaskCompleted( throw new UnsupportedOperationException("not implemented"); } + @Override + public void ResetStickyTaskList( + ResetStickyTaskListRequest resetRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new UnsupportedOperationException("not implemented"); + } + @Override public void QueryWorkflow(QueryWorkflowRequest queryRequest, AsyncMethodCallback resultHandler) throws TException { @@ -1731,4 +2089,35 @@ public void DescribeTaskList(DescribeTaskListRequest request, AsyncMethodCallbac throws TException { throw new UnsupportedOperationException("not implemented"); } + + @Override + public void RegisterDomain( + RegisterDomainRequest registerRequest, AsyncMethodCallback resultHandler) throws TException { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public void DescribeDomain( + DescribeDomainRequest describeRequest, AsyncMethodCallback resultHandler) throws TException { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public void ListDomains(ListDomainsRequest listRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public void UpdateDomain(UpdateDomainRequest updateRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public void DeprecateDomain( + DeprecateDomainRequest deprecateRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new UnsupportedOperationException("not implemented"); + } } diff --git a/src/main/java/com/uber/cadence/testing/TestEnvironmentOptions.java b/src/main/java/com/uber/cadence/testing/TestEnvironmentOptions.java index cec3ef9da..9ec39e17d 100644 --- a/src/main/java/com/uber/cadence/testing/TestEnvironmentOptions.java +++ b/src/main/java/com/uber/cadence/testing/TestEnvironmentOptions.java @@ -93,7 +93,7 @@ public TestEnvironmentOptions build() { } if (factoryOptions == null) { - factoryOptions = new Worker.FactoryOptions.Builder().Build(); + factoryOptions = new Worker.FactoryOptions.Builder().build(); } return new TestEnvironmentOptions( diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index 0902c34c4..431a3421b 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -42,9 +42,6 @@ import com.uber.cadence.workflow.WorkflowMethod; import com.uber.m3.tally.Scope; import com.uber.m3.util.ImmutableMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.lang.reflect.Type; import java.net.InetAddress; import java.net.UnknownHostException; @@ -59,6 +56,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Hosts activity and workflow implementations. Uses long poll to receive activity and decision @@ -428,7 +427,7 @@ public Factory(IWorkflowService workflowService, String domain, FactoryOptions f Objects.requireNonNull(workflowService, "workflowService should not be null"); factoryOptions = - factoryOptions == null ? new FactoryOptions.Builder().Build() : factoryOptions; + factoryOptions == null ? new FactoryOptions.Builder().build() : factoryOptions; this.factoryOptions = factoryOptions; workflowThreadPool = @@ -558,10 +557,10 @@ public void shutdown(Duration timeout) { } for (Worker worker : workers) { - try{ + try { worker.shutdown(timeout); } catch (InterruptedException e) { - log.warn("Interrupted exception thrown during worker shutdown.",e); + log.warn("Interrupted exception thrown during worker shutdown.", e); } } } @@ -656,7 +655,7 @@ public Builder setMetricScope(Scope metricScope) { return this; } - public FactoryOptions Build() { + public FactoryOptions build() { return new FactoryOptions( disableStickyExecution, cacheMaximumSize, diff --git a/src/main/thrift/cadence.thrift b/src/main/thrift/cadence.thrift index cfa2597c5..15e99e34f 100644 --- a/src/main/thrift/cadence.thrift +++ b/src/main/thrift/cadence.thrift @@ -42,6 +42,7 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.DomainAlreadyExistsError domainExistsError, + 4: shared.ServiceBusyError serviceBusyError, ) /** @@ -52,8 +53,20 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.ServiceBusyError serviceBusyError, ) + /** + * ListDomains returns the information and configuration for all domains. + **/ + shared.ListDomainsResponse ListDomains(1: shared.ListDomainsRequest listRequest) + throws ( + 1: shared.BadRequestError badRequestError, + 2: shared.InternalServiceError internalServiceError, + 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.ServiceBusyError serviceBusyError, + ) + /** * UpdateDomain is used to update the information and configuration for a registered domain. **/ @@ -62,6 +75,8 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.ServiceBusyError serviceBusyError, + 5: shared.DomainNotActiveError domainNotActiveError, ) /** @@ -74,6 +89,8 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.ServiceBusyError serviceBusyError, + 5: shared.DomainNotActiveError domainNotActiveError, ) /** @@ -88,6 +105,9 @@ service WorkflowService { 2: shared.InternalServiceError internalServiceError, 3: shared.WorkflowExecutionAlreadyStartedError sessionAlreadyExistError, 4: shared.ServiceBusyError serviceBusyError, + 5: shared.DomainNotActiveError domainNotActiveError, + 6: shared.LimitExceededError limitExceededError, + 7: shared.EntityNotExistsError entityNotExistError, ) /** @@ -114,6 +134,9 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.ServiceBusyError serviceBusyError, + 4: shared.LimitExceededError limitExceededError, + 5: shared.EntityNotExistsError entityNotExistError, + 6: shared.DomainNotActiveError domainNotActiveError, ) /** @@ -122,12 +145,16 @@ service WorkflowService { * potentially new ActivityTask being created for corresponding decisions. It will also create a DecisionTaskCompleted * event in the history for that session. Use the 'taskToken' provided as response of PollForDecisionTask API call * for completing the DecisionTask. + * The response could contain a new decision task if there is one or if the request asking for one. **/ - void RespondDecisionTaskCompleted(1: shared.RespondDecisionTaskCompletedRequest completeRequest) + shared.RespondDecisionTaskCompletedResponse RespondDecisionTaskCompleted(1: shared.RespondDecisionTaskCompletedRequest completeRequest) throws ( 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.DomainNotActiveError domainNotActiveError, + 5: shared.LimitExceededError limitExceededError, + 6: shared.ServiceBusyError serviceBusyError, ) /** @@ -141,6 +168,9 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.DomainNotActiveError domainNotActiveError, + 5: shared.LimitExceededError limitExceededError, + 6: shared.ServiceBusyError serviceBusyError, ) /** @@ -157,6 +187,9 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.ServiceBusyError serviceBusyError, + 4: shared.LimitExceededError limitExceededError, + 5: shared.EntityNotExistsError entityNotExistError, + 6: shared.DomainNotActiveError domainNotActiveError, ) /** @@ -171,6 +204,26 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.DomainNotActiveError domainNotActiveError, + 5: shared.LimitExceededError limitExceededError, + 6: shared.ServiceBusyError serviceBusyError, + ) + + /** + * RecordActivityTaskHeartbeatByID is called by application worker while it is processing an ActivityTask. If worker fails + * to heartbeat within 'heartbeatTimeoutSeconds' interval for the ActivityTask, then it will be marked as timedout and + * 'ActivityTaskTimedOut' event will be written to the workflow history. Calling 'RecordActivityTaskHeartbeatByID' will + * fail with 'EntityNotExistsError' in such situations. Instead of using 'taskToken' like in RecordActivityTaskHeartbeat, + * use Domain, WorkflowID and ActivityID + **/ + shared.RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeatByID(1: shared.RecordActivityTaskHeartbeatByIDRequest heartbeatRequest) + throws ( + 1: shared.BadRequestError badRequestError, + 2: shared.InternalServiceError internalServiceError, + 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.DomainNotActiveError domainNotActiveError, + 5: shared.LimitExceededError limitExceededError, + 6: shared.ServiceBusyError serviceBusyError, ) /** @@ -185,12 +238,15 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.DomainNotActiveError domainNotActiveError, + 5: shared.LimitExceededError limitExceededError, + 6: shared.ServiceBusyError serviceBusyError, ) /** * RespondActivityTaskCompletedByID is called by application worker when it is done processing an ActivityTask. * It will result in a new 'ActivityTaskCompleted' event being written to the workflow history and a new DecisionTask - * created for the workflow so new decisions could be made. Similar to RespondActivityTaskCompleted but use DomainID, + * created for the workflow so new decisions could be made. Similar to RespondActivityTaskCompleted but use Domain, * WorkflowID and ActivityID instead of 'taskToken' for completion. It fails with 'EntityNotExistsError' * if the these IDs are not valid anymore due to activity timeout. **/ @@ -199,6 +255,9 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.DomainNotActiveError domainNotActiveError, + 5: shared.LimitExceededError limitExceededError, + 6: shared.ServiceBusyError serviceBusyError, ) /** @@ -213,13 +272,16 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.DomainNotActiveError domainNotActiveError, + 5: shared.LimitExceededError limitExceededError, + 6: shared.ServiceBusyError serviceBusyError, ) /** * RespondActivityTaskFailedByID is called by application worker when it is done processing an ActivityTask. * It will result in a new 'ActivityTaskFailed' event being written to the workflow history and a new DecisionTask * created for the workflow instance so new decisions could be made. Similar to RespondActivityTaskFailed but use - * DomainID, WorkflowID and ActivityID instead of 'taskToken' for completion. It fails with 'EntityNotExistsError' + * Domain, WorkflowID and ActivityID instead of 'taskToken' for completion. It fails with 'EntityNotExistsError' * if the these IDs are not valid anymore due to activity timeout. **/ void RespondActivityTaskFailedByID(1: shared.RespondActivityTaskFailedByIDRequest failRequest) @@ -227,6 +289,9 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.DomainNotActiveError domainNotActiveError, + 5: shared.LimitExceededError limitExceededError, + 6: shared.ServiceBusyError serviceBusyError, ) /** @@ -241,13 +306,16 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.DomainNotActiveError domainNotActiveError, + 5: shared.LimitExceededError limitExceededError, + 6: shared.ServiceBusyError serviceBusyError, ) /** * RespondActivityTaskCanceledByID is called by application worker when it is successfully canceled an ActivityTask. * It will result in a new 'ActivityTaskCanceled' event being written to the workflow history and a new DecisionTask * created for the workflow instance so new decisions could be made. Similar to RespondActivityTaskCanceled but use - * DomainID, WorkflowID and ActivityID instead of 'taskToken' for completion. It fails with 'EntityNotExistsError' + * Domain, WorkflowID and ActivityID instead of 'taskToken' for completion. It fails with 'EntityNotExistsError' * if the these IDs are not valid anymore due to activity timeout. **/ void RespondActivityTaskCanceledByID(1: shared.RespondActivityTaskCanceledByIDRequest canceledRequest) @@ -255,6 +323,9 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.DomainNotActiveError domainNotActiveError, + 5: shared.LimitExceededError limitExceededError, + 6: shared.ServiceBusyError serviceBusyError, ) /** @@ -264,13 +335,15 @@ service WorkflowService { * anymore due to completion or doesn't exist. **/ void RequestCancelWorkflowExecution(1: shared.RequestCancelWorkflowExecutionRequest cancelRequest) - throws ( - 1: shared.BadRequestError badRequestError, - 2: shared.InternalServiceError internalServiceError, - 3: shared.EntityNotExistsError entityNotExistError, - 4: shared.CancellationAlreadyRequestedError cancellationAlreadyRequestedError, - 5: shared.ServiceBusyError serviceBusyError, - ) + throws ( + 1: shared.BadRequestError badRequestError, + 2: shared.InternalServiceError internalServiceError, + 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.CancellationAlreadyRequestedError cancellationAlreadyRequestedError, + 5: shared.ServiceBusyError serviceBusyError, + 6: shared.DomainNotActiveError domainNotActiveError, + 7: shared.LimitExceededError limitExceededError, + ) /** * SignalWorkflowExecution is used to send a signal event to running workflow execution. This results in @@ -282,6 +355,26 @@ service WorkflowService { 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, 4: shared.ServiceBusyError serviceBusyError, + 5: shared.DomainNotActiveError domainNotActiveError, + 6: shared.LimitExceededError limitExceededError, + ) + + /** + * SignalWithStartWorkflowExecution is used to ensure sending signal to a workflow. + * If the workflow is running, this results in WorkflowExecutionSignaled event being recorded in the history + * and a decision task being created for the execution. + * If the workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled + * events being recorded in history, and a decision task being created for the execution + **/ + shared.StartWorkflowExecutionResponse SignalWithStartWorkflowExecution(1: shared.SignalWithStartWorkflowExecutionRequest signalWithStartRequest) + throws ( + 1: shared.BadRequestError badRequestError, + 2: shared.InternalServiceError internalServiceError, + 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.ServiceBusyError serviceBusyError, + 5: shared.DomainNotActiveError domainNotActiveError, + 6: shared.LimitExceededError limitExceededError, + 7: shared.WorkflowExecutionAlreadyStartedError workflowAlreadyStartedError, ) /** @@ -294,6 +387,8 @@ service WorkflowService { 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, 4: shared.ServiceBusyError serviceBusyError, + 5: shared.DomainNotActiveError domainNotActiveError, + 6: shared.LimitExceededError limitExceededError, ) /** @@ -305,6 +400,7 @@ service WorkflowService { 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, 4: shared.ServiceBusyError serviceBusyError, + 5: shared.LimitExceededError limitExceededError, ) /** @@ -328,6 +424,28 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.LimitExceededError limitExceededError, + 5: shared.ServiceBusyError serviceBusyError, + 6: shared.DomainNotActiveError domainNotActiveError, + ) + + /** + * Reset the sticky tasklist related information in mutable state of a given workflow. + * Things cleared are: + * 1. StickyTaskList + * 2. StickyScheduleToStartTimeout + * 3. ClientLibraryVersion + * 4. ClientFeatureVersion + * 5. ClientImpl + **/ + shared.ResetStickyTaskListResponse ResetStickyTaskList(1: shared.ResetStickyTaskListRequest resetRequest) + throws ( + 1: shared.BadRequestError badRequestError, + 2: shared.InternalServiceError internalServiceError, + 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.LimitExceededError limitExceededError, + 5: shared.ServiceBusyError serviceBusyError, + 6: shared.DomainNotActiveError domainNotActiveError, ) /** @@ -339,6 +457,8 @@ service WorkflowService { 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, 4: shared.QueryFailedError queryFailedError, + 5: shared.LimitExceededError limitExceededError, + 6: shared.ServiceBusyError serviceBusyError, ) /** @@ -349,6 +469,8 @@ service WorkflowService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.LimitExceededError limitExceededError, + 5: shared.ServiceBusyError serviceBusyError, ) /** @@ -357,9 +479,11 @@ service WorkflowService { **/ shared.DescribeTaskListResponse DescribeTaskList(1: shared.DescribeTaskListRequest request) throws ( - 1: shared.BadRequestError badRequestError, - 2: shared.InternalServiceError internalServiceError, - 3: shared.EntityNotExistsError entityNotExistError, - ) + 1: shared.BadRequestError badRequestError, + 2: shared.InternalServiceError internalServiceError, + 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.LimitExceededError limitExceededError, + 5: shared.ServiceBusyError serviceBusyError, + ) } diff --git a/src/main/thrift/shared.thrift b/src/main/thrift/shared.thrift index f19f47b99..139249f7a 100644 --- a/src/main/thrift/shared.thrift +++ b/src/main/thrift/shared.thrift @@ -54,6 +54,25 @@ exception QueryFailedError { 1: required string message } +exception DomainNotActiveError { + 1: required string message + 2: required string domainName + 3: required string currentCluster + 4: required string activeCluster +} + +exception LimitExceededError { + 1: required string message +} + +exception AccessDeniedError { + 1: required string message +} + +exception RetryTaskError { + 1: required string message +} + enum WorkflowIdReusePolicy { /* * allow start a workflow execution using the same workflow ID, @@ -85,6 +104,10 @@ enum TimeoutType { HEARTBEAT, } +// whenever this list of decision is changed +// do change the mutableStateBuilder.go +// function shouldBufferEvent +// to make sure wo do the correct event ordering enum DecisionType { ScheduleActivityTask, RequestCancelActivityTask, @@ -161,6 +184,9 @@ enum DecisionTaskFailedCause { WORKFLOW_WORKER_UNHANDLED_FAILURE, BAD_SIGNAL_WORKFLOW_EXECUTION_ATTRIBUTES, BAD_START_CHILD_EXECUTION_ATTRIBUTES, + FORCE_CLOSE_DECISION, + FAILOVER_CLOSE_DECISION, + BAD_SIGNAL_INPUT_SIZE, } enum CancelExternalWorkflowExecutionFailedCause { @@ -211,6 +237,10 @@ enum TaskListKind { STICKY, } +struct Header { + 10: optional map fields +} + struct WorkflowType { 10: optional string name } @@ -264,6 +294,7 @@ struct ScheduleActivityTaskDecisionAttributes { 50: optional i32 scheduleToStartTimeoutSeconds 55: optional i32 startToCloseTimeoutSeconds 60: optional i32 heartbeatTimeoutSeconds + 70: optional RetryPolicy retryPolicy } struct RequestCancelActivityTaskDecisionAttributes { @@ -312,6 +343,7 @@ struct SignalExternalWorkflowExecutionDecisionAttributes { struct RecordMarkerDecisionAttributes { 10: optional string markerName 20: optional binary details + 30: optional Header header } struct ContinueAsNewWorkflowExecutionDecisionAttributes { @@ -320,6 +352,8 @@ struct ContinueAsNewWorkflowExecutionDecisionAttributes { 30: optional binary input 40: optional i32 executionStartToCloseTimeoutSeconds 50: optional i32 taskStartToCloseTimeoutSeconds + 60: optional i32 backoffStartIntervalInSeconds + 70: optional RetryPolicy retryPolicy } struct StartChildWorkflowExecutionDecisionAttributes { @@ -333,6 +367,7 @@ struct StartChildWorkflowExecutionDecisionAttributes { 80: optional ChildPolicy childPolicy 90: optional binary control 100: optional WorkflowIdReusePolicy workflowIdReusePolicy + 110: optional RetryPolicy retryPolicy } struct Decision { @@ -353,11 +388,19 @@ struct Decision { struct WorkflowExecutionStartedEventAttributes { 10: optional WorkflowType workflowType + 12: optional string parentWorkflowDomain + 14: optional WorkflowExecution parentWorkflowExecution + 16: optional i64 (js.type = "Long") parentInitiatedEventId 20: optional TaskList taskList 30: optional binary input 40: optional i32 executionStartToCloseTimeoutSeconds 50: optional i32 taskStartToCloseTimeoutSeconds + 52: optional ChildPolicy childPolicy + 54: optional string continuedExecutionRunId 60: optional string identity + 70: optional RetryPolicy retryPolicy + 80: optional i32 attempt + 90: optional i64 (js.type = "Long") expirationTimestamp } struct WorkflowExecutionCompletedEventAttributes { @@ -383,6 +426,7 @@ struct WorkflowExecutionContinuedAsNewEventAttributes { 50: optional i32 executionStartToCloseTimeoutSeconds 60: optional i32 taskStartToCloseTimeoutSeconds 70: optional i64 (js.type = "Long") decisionTaskCompletedEventId + 80: optional i32 backoffStartIntervalInSeconds } struct DecisionTaskScheduledEventAttributes { @@ -402,6 +446,7 @@ struct DecisionTaskCompletedEventAttributes { 20: optional i64 (js.type = "Long") scheduledEventId 30: optional i64 (js.type = "Long") startedEventId 40: optional string identity + 50: optional string binaryChecksum } struct DecisionTaskTimedOutEventAttributes { @@ -429,12 +474,14 @@ struct ActivityTaskScheduledEventAttributes { 55: optional i32 startToCloseTimeoutSeconds 60: optional i32 heartbeatTimeoutSeconds 90: optional i64 (js.type = "Long") decisionTaskCompletedEventId + 110: optional RetryPolicy retryPolicy } struct ActivityTaskStartedEventAttributes { 10: optional i64 (js.type = "Long") scheduledEventId 20: optional string identity 30: optional string requestId + 40: optional i32 attempt } struct ActivityTaskCompletedEventAttributes { @@ -519,6 +566,7 @@ struct MarkerRecordedEventAttributes { 10: optional string markerName 20: optional binary details 30: optional i64 (js.type = "Long") decisionTaskCompletedEventId + 40: optional Header header } struct WorkflowExecutionSignaledEventAttributes { @@ -594,6 +642,7 @@ struct StartChildWorkflowExecutionInitiatedEventAttributes { 90: optional binary control 100: optional i64 (js.type = "Long") decisionTaskCompletedEventId 110: optional WorkflowIdReusePolicy workflowIdReusePolicy + 120: optional RetryPolicy retryPolicy } struct StartChildWorkflowExecutionFailedEventAttributes { @@ -662,6 +711,7 @@ struct HistoryEvent { 10: optional i64 (js.type = "Long") eventId 20: optional i64 (js.type = "Long") timestamp 30: optional EventType eventType + 35: optional i64 (js.type = "Long") version 40: optional WorkflowExecutionStartedEventAttributes workflowExecutionStartedEventAttributes 50: optional WorkflowExecutionCompletedEventAttributes workflowExecutionCompletedEventAttributes 60: optional WorkflowExecutionFailedEventAttributes workflowExecutionFailedEventAttributes @@ -727,6 +777,8 @@ struct DomainInfo { 20: optional DomainStatus status 30: optional string description 40: optional string ownerEmail + // A key-value map for any customized purpose + 50: optional map data } struct DomainConfiguration { @@ -737,6 +789,8 @@ struct DomainConfiguration { struct UpdateDomainInfo { 10: optional string description 20: optional string ownerEmail + // A key-value map for any customized purpose + 30: optional map data } struct ClusterReplicationConfiguration { @@ -756,10 +810,23 @@ struct RegisterDomainRequest { 50: optional bool emitMetric 60: optional list clusters 70: optional string activeClusterName + // A key-value map for any customized purpose + 80: optional map data + 90: optional string securityToken +} + +struct ListDomainsRequest { + 10: optional i32 pageSize + 20: optional binary nextPageToken +} + +struct ListDomainsResponse { + 10: optional list domains + 20: optional binary nextPageToken } struct DescribeDomainRequest { - 10: optional string name + 10: optional string name } struct DescribeDomainResponse { @@ -775,6 +842,7 @@ struct UpdateDomainRequest { 20: optional UpdateDomainInfo updatedInfo 30: optional DomainConfiguration configuration 40: optional DomainReplicationConfiguration replicationConfiguration + 50: optional string securityToken } struct UpdateDomainResponse { @@ -787,6 +855,7 @@ struct UpdateDomainResponse { struct DeprecateDomainRequest { 10: optional string name + 20: optional string securityToken } struct StartWorkflowExecutionRequest { @@ -800,6 +869,8 @@ struct StartWorkflowExecutionRequest { 80: optional string identity 90: optional string requestId 100: optional WorkflowIdReusePolicy workflowIdReusePolicy + 110: optional ChildPolicy childPolicy + 120: optional RetryPolicy retryPolicy } struct StartWorkflowExecutionResponse { @@ -837,6 +908,13 @@ struct RespondDecisionTaskCompletedRequest { 30: optional binary executionContext 40: optional string identity 50: optional StickyExecutionAttributes stickyAttributes + 60: optional bool returnNewDecisionTask + 70: optional bool forceCreateNewDecisionTask + 80: optional string binaryChecksum +} + +struct RespondDecisionTaskCompletedResponse { + 10: optional PollForDecisionTaskResponse decisionTask } struct RespondDecisionTaskFailedRequest { @@ -864,6 +942,11 @@ struct PollForActivityTaskResponse { 90: optional i64 (js.type = "Long") startedTimestamp 100: optional i32 startToCloseTimeoutSeconds 110: optional i32 heartbeatTimeoutSeconds + 120: optional i32 attempt + 130: optional i64 (js.type = "Long") scheduledTimestampOfThisAttempt + 140: optional binary heartbeatDetails + 150: optional WorkflowType workflowType + 160: optional string workflowDomain } struct RecordActivityTaskHeartbeatRequest { @@ -872,6 +955,15 @@ struct RecordActivityTaskHeartbeatRequest { 30: optional string identity } +struct RecordActivityTaskHeartbeatByIDRequest { + 10: optional string domain + 20: optional string workflowID + 30: optional string runID + 40: optional string activityID + 50: optional binary details + 60: optional string identity +} + struct RecordActivityTaskHeartbeatResponse { 10: optional bool cancelRequested } @@ -954,6 +1046,23 @@ struct SignalWorkflowExecutionRequest { 70: optional binary control } +struct SignalWithStartWorkflowExecutionRequest { + 10: optional string domain + 20: optional string workflowId + 30: optional WorkflowType workflowType + 40: optional TaskList taskList + 50: optional binary input + 60: optional i32 executionStartToCloseTimeoutSeconds + 70: optional i32 taskStartToCloseTimeoutSeconds + 80: optional string identity + 90: optional string requestId + 100: optional WorkflowIdReusePolicy workflowIdReusePolicy + 110: optional string signalName + 120: optional binary signalInput + 130: optional binary control + 140: optional RetryPolicy retryPolicy +} + struct TerminateWorkflowExecutionRequest { 10: optional string domain 20: optional WorkflowExecution workflowExecution @@ -1006,6 +1115,16 @@ struct WorkflowQuery { 20: optional binary queryArgs } +struct ResetStickyTaskListRequest { + 10: optional string domain + 20: optional WorkflowExecution execution +} + +struct ResetStickyTaskListResponse { + // The reason to keep this response is to allow returning + // information in the future. +} + struct RespondQueryTaskCompletedRequest { 10: optional binary taskToken 20: optional QueryTaskCompletedType completedType @@ -1042,6 +1161,26 @@ struct DescribeTaskListResponse { 10: optional list pollers } +//At least one of the parameters needs to be provided +struct DescribeHistoryHostRequest { + 10: optional string hostAddress //ip:port + 20: optional i32 shardIdForHost + 30: optional WorkflowExecution executionForHost +} + +struct DescribeHistoryHostResponse{ + 10: optional i32 numberOfShards + 20: optional list shardIDs + 30: optional DomainCacheInfo domainCache + 40: optional string shardControllerStatus + 50: optional string address +} + +struct DomainCacheInfo{ + 10: optional i64 numOfItemsInCacheByID + 20: optional i64 numOfItemsInCacheByName +} + enum TaskListType { /* * Decision type of tasklist @@ -1058,3 +1197,27 @@ struct PollerInfo { 10: optional i64 (js.type = "Long") lastAccessTime 20: optional string identity } + +struct RetryPolicy { + // Interval of the first retry. If coefficient is 1.0 then it is used for all retries. + 10: optional i32 initialIntervalInSeconds + + // Coefficient used to calculate the next retry interval. + // The next retry interval is previous interval multiplied by the coefficient. + // Must be 1 or larger. + 20: optional double backoffCoefficient + + // Maximum interval between retries. Exponential backoff leads to interval increase. + // This value is the cap of the increase. Default is 100x of initial interval. + 30: optional i32 maximumIntervalInSeconds + + // Maximum number of attempts. When exceeded the retries stop even if not expired yet. + // Must be 1 or bigger. Default is unlimited. + 40: optional i32 maximumAttempts + + // Non-Retriable errors. Will stop retrying if error matches this list. + 50: optional list nonRetriableErrorReasons + + // Expiration time for the whole retry process. + 60: optional i32 expirationIntervalInSeconds +} diff --git a/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java b/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java index 20524b0e1..c95db3a1c 100644 --- a/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java +++ b/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java @@ -69,8 +69,10 @@ public class StickyWorkerTest { public static final String DOMAIN = "UnitTest"; - private static final boolean skipDockerService = - Boolean.parseBoolean(System.getenv("SKIP_DOCKER_SERVICE")); + // TODO: Enable for docker as soon as the server commit + // a36c84991664571636d37a3826b282ddbdbd2402 is released + private static final boolean skipDockerService = true; + // Boolean.parseBoolean(System.getenv("SKIP_DOCKER_SERVICE")); @Parameterized.Parameter public boolean useExternalService; @@ -101,7 +103,7 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedSignals() throws Exception TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder().setMetricScope(scope).Build()); + new Worker.FactoryOptions.Builder().setMetricScope(scope).build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build()); worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class); @@ -156,7 +158,7 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedActivities() throws Except TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder().setMetricScope(scope).Build()); + new Worker.FactoryOptions.Builder().setMetricScope(scope).build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build()); worker.registerWorkflowImplementationTypes(ActivitiesWorkflowImpl.class); @@ -211,7 +213,7 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedChildWorkflows() throws Ex TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder().setMetricScope(scope).Build()); + new Worker.FactoryOptions.Builder().setMetricScope(scope).build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build()); worker.registerWorkflowImplementationTypes( @@ -259,7 +261,7 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedMutableSideEffect() throws TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder().setMetricScope(scope).Build()); + new Worker.FactoryOptions.Builder().setMetricScope(scope).build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build()); worker.registerWorkflowImplementationTypes(TestMutableSideEffectWorkflowImpl.class); @@ -308,7 +310,7 @@ public void whenStickyIsNotEnabledThenTheWorkflowIsNotCached() { String taskListName = "notCachedStickyTest"; TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder().setDisableStickyExecution(true).Build()); + new Worker.FactoryOptions.Builder().setDisableStickyExecution(true).build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName); worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class); @@ -340,7 +342,7 @@ public void whenCacheIsEvictedTheWorkerCanRecover() throws Exception { // Arrange String taskListName = "evictedStickyTest"; TestEnvironmentWrapper wrapper = - new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().Build()); + new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName); worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class); @@ -379,7 +381,7 @@ public void workflowsCanBeQueried() throws Exception { // Arrange String taskListName = "queryStickyTest"; TestEnvironmentWrapper wrapper = - new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().Build()); + new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName); worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class); @@ -419,7 +421,7 @@ public void workflowsCanBeQueriedAfterEviction() throws Exception { // Arrange String taskListName = "queryEvictionStickyTest"; TestEnvironmentWrapper wrapper = - new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().Build()); + new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName); worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class); @@ -464,7 +466,7 @@ private class TestEnvironmentWrapper { public TestEnvironmentWrapper(Worker.FactoryOptions options) { if (options == null) { - options = new Worker.FactoryOptions.Builder().Build(); + options = new Worker.FactoryOptions.Builder().build(); } factory = new Worker.Factory(DOMAIN, options); TestEnvironmentOptions testOptions = diff --git a/src/test/java/com/uber/cadence/worker/WorkerStressTests.java b/src/test/java/com/uber/cadence/worker/WorkerStressTests.java index 94379f6ca..44512ce73 100644 --- a/src/test/java/com/uber/cadence/worker/WorkerStressTests.java +++ b/src/test/java/com/uber/cadence/worker/WorkerStressTests.java @@ -77,7 +77,7 @@ public void longHistoryWorkflowsCompleteSuccessfully() { TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder().setMaxWorkflowThreadCount(200).Build()); + new Worker.FactoryOptions.Builder().setMaxWorkflowThreadCount(200).build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build()); worker.registerWorkflowImplementationTypes(ActivitiesWorkflowImpl.class); @@ -120,7 +120,7 @@ public void selfEvictionDoesNotCauseDeadlock() throws InterruptedException { new Worker.FactoryOptions.Builder() .setDisableStickyExecution(false) .setMaxWorkflowThreadCount(2) - .Build()); + .build()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build()); worker.registerWorkflowImplementationTypes(ActivitiesWorkflowImpl.class); @@ -171,7 +171,7 @@ private class TestEnvironmentWrapper { public TestEnvironmentWrapper(Worker.FactoryOptions options) { if (options == null) { - options = new Worker.FactoryOptions.Builder().Build(); + options = new Worker.FactoryOptions.Builder().build(); } factory = new Worker.Factory(DOMAIN, options); TestEnvironmentOptions testOptions = diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index 470716d6d..d0bedb7f0 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -119,7 +119,7 @@ public static Object[] data() { @Rule public Timeout globalTimeout = - Timeout.seconds(DEBUGGER_TIMEOUTS ? 500 : skipDockerService ? 5 : 20); + Timeout.seconds(DEBUGGER_TIMEOUTS ? 500 : skipDockerService ? 10 : 20); @Rule public TestWatcher watchman = @@ -207,9 +207,13 @@ public void setUp() { tracer = new TracingWorkflowInterceptorFactory(); // TODO: Create a version of TestWorkflowEnvironment that runs against a real service. if (useExternalService) { + // TODO: Enable sticky execution as soon as the server commit + // a36c84991664571636d37a3826b282ddbdbd2402 is released + Worker.FactoryOptions factoryOptions = + new Worker.FactoryOptions.Builder().setDisableStickyExecution(true).build(); + workerFactory = new Worker.Factory(service, DOMAIN, factoryOptions); WorkerOptions workerOptions = new WorkerOptions.Builder().setInterceptorFactory(tracer).build(); - workerFactory = new Worker.Factory(service, DOMAIN); worker = workerFactory.newWorker(taskList, workerOptions); workflowClient = WorkflowClient.newInstance(DOMAIN); WorkflowClientOptions clientOptions =