Skip to content

Commit

Permalink
Handle data corruption error in replication (#3895)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 committed Jan 14, 2021
1 parent 13303a3 commit 79ce048
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 14 deletions.
4 changes: 2 additions & 2 deletions common/persistence/cassandra/cassandraHistoryPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,11 @@ func (h *nosqlHistoryManager) ReadHistoryBranch(

switch {
case nodeID < lastNodeID:
return nil, &types.InternalServiceError{
return nil, &types.InternalDataInconsistencyError{
Message: fmt.Sprintf("corrupted data, nodeID cannot decrease"),
}
case nodeID == lastNodeID:
return nil, &types.InternalServiceError{
return nil, &types.InternalDataInconsistencyError{
Message: fmt.Sprintf("corrupted data, same nodeID must have smaller txnID"),
}
default: // row.NodeID > lastNodeID:
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/historyStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (m *historyV2ManagerImpl) readRawHistoryBranch(
}

if token.CurrentRangeIndex == notStartedIndex {
return nil, nil, 0, nil, &types.InternalServiceError{
return nil, nil, 0, nil, &types.InternalDataInconsistencyError{
Message: fmt.Sprintf("branchRange is corrupted"),
}
}
Expand Down Expand Up @@ -463,7 +463,7 @@ func (m *historyV2ManagerImpl) readHistoryBranch(
}
if len(events) == 0 {
logger.Error("Empty events in a batch")
return nil, nil, nil, 0, 0, &types.InternalServiceError{
return nil, nil, nil, 0, 0, &types.InternalDataInconsistencyError{
Message: fmt.Sprintf("corrupted history event batch, empty events"),
}
}
Expand All @@ -478,7 +478,7 @@ func (m *historyV2ManagerImpl) readHistoryBranch(
tag.FirstEventVersion(firstEvent.GetVersion()), tag.WorkflowFirstEventID(firstEvent.GetEventID()),
tag.LastEventVersion(lastEvent.GetVersion()), tag.WorkflowNextEventID(lastEvent.GetEventID()),
tag.Counter(eventCount))
return nil, nil, nil, 0, 0, &types.InternalServiceError{
return nil, nil, nil, 0, 0, &types.InternalDataInconsistencyError{
Message: fmt.Sprintf("corrupted history event batch, wrong version and IDs"),
}
}
Expand All @@ -504,7 +504,7 @@ func (m *historyV2ManagerImpl) readHistoryBranch(
tag.LastEventVersion(lastEvent.GetVersion()), tag.WorkflowNextEventID(lastEvent.GetEventID()),
tag.TokenLastEventVersion(token.LastEventVersion), tag.TokenLastEventID(token.LastEventID),
tag.Counter(eventCount))
return nil, nil, nil, 0, 0, &types.InternalServiceError{
return nil, nil, nil, 0, 0, &types.InternalDataInconsistencyError{
Message: fmt.Sprintf("corrupted history event batch, eventID is not continouous"),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func (s *HistoryV2PersistenceSuite) TestConcurrentlyCreateAndAppendBranches() {

// read to verify override success, at this point history is corrupted, missing 7/8, so we should only see 6 events
_, err = s.readWithError(ctx, branch, 1, 25)
_, ok := err.(*types.InternalServiceError)
_, ok := err.(*types.InternalDataInconsistencyError)
s.Equal(true, ok)

events = s.read(ctx, branch, 1, 7)
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/sql/sqlHistoryManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (m *sqlHistoryV2Manager) ReadHistoryBranch(
// -> batch with lower transaction ID is invalid (happens before)
// -> batch with higher transaction ID is valid
if row.NodeID < lastNodeID {
return nil, &types.InternalServiceError{
return nil, &types.InternalDataInconsistencyError{
Message: fmt.Sprintf("corrupted data, nodeID cannot decrease"),
}
} else if row.NodeID > lastNodeID {
Expand All @@ -213,11 +213,11 @@ func (m *sqlHistoryV2Manager) ReadHistoryBranch(

switch {
case row.NodeID < lastNodeID:
return nil, &types.InternalServiceError{
return nil, &types.InternalDataInconsistencyError{
Message: fmt.Sprintf("corrupted data, nodeID cannot decrease"),
}
case row.NodeID == lastNodeID:
return nil, &types.InternalServiceError{
return nil, &types.InternalDataInconsistencyError{
Message: fmt.Sprintf("corrupted data, same nodeID must have smaller txnID"),
}
default: // row.NodeID > lastNodeID:
Expand Down
14 changes: 10 additions & 4 deletions service/history/replication/task_ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func (t *taskAckManagerImpl) GetTasks(

var replicationTasks []*types.ReplicationTask
readLevel := lastReadTaskID
TaskInfoLoop:
for _, taskInfo := range taskInfoList {
_ = t.rateLimiter.Wait(ctx)
var replicationTask *types.ReplicationTask
Expand All @@ -163,10 +164,15 @@ func (t *taskAckManagerImpl) GetTasks(
}

err = backoff.Retry(op, t.retryPolicy, common.IsPersistenceTransientError)
if err != nil {
switch err.(type) {
case nil:
// No action
case *types.BadRequestError, *types.InternalDataInconsistencyError, *types.EntityNotExistsError:
t.logger.Warn("Failed to get replication task.", tag.Error(err))
default:
t.logger.Error("Failed to get replication task. Return what we have so far.", tag.Error(err))
hasMore = true
break
break TaskInfoLoop
}
readLevel = taskInfo.GetTaskID()
if replicationTask != nil {
Expand Down Expand Up @@ -326,7 +332,7 @@ func (t *taskAckManagerImpl) getEventsBlob(
}

if len(eventBatchBlobs) != 1 {
return nil, &types.InternalServiceError{
return nil, &types.InternalDataInconsistencyError{
Message: "replicatorQueueProcessor encounter more than 1 NDC raw event batch",
}
}
Expand Down Expand Up @@ -615,7 +621,7 @@ func getVersionHistoryItems(
) ([]*types.VersionHistoryItem, []byte, error) {

if versionHistories == nil {
return nil, nil, &types.InternalServiceError{
return nil, nil, &types.BadRequestError{
Message: "replicatorQueueProcessor encounter workflow without version histories",
}
}
Expand Down
88 changes: 88 additions & 0 deletions service/history/replication/task_ack_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,3 +936,91 @@ func (s *taskAckManagerSuite) TestGetTasks() {
ackLevel := s.mockShard.GetClusterReplicationLevel(clusterName)
s.Equal(int64(10), ackLevel)
}

func (s *taskAckManagerSuite) TestGetTasks_ReturnDataErrors() {
domainID := uuid.New()
workflowID := uuid.New()
runID := uuid.New()
clusterName := "cluster"
taskID := int64(10)
taskInfo := &persistence.ReplicationTaskInfo{
TaskType: persistence.ReplicationTaskTypeHistory,
TaskID: taskID + 1,
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
FirstEventID: 6,
Version: 1,
}
versionHistories := &persistence.VersionHistories{
CurrentVersionHistoryIndex: 0,
Histories: []*persistence.VersionHistory{
{
BranchToken: []byte{1},
Items: []*persistence.VersionHistoryItem{
{
EventID: 6,
Version: 1,
},
},
},
},
}
workflowContext, release, _ := s.ackManager.executionCache.GetOrCreateWorkflowExecutionForBackground(
domainID,
types.WorkflowExecution{
WorkflowID: common.StringPtr(workflowID),
RunID: common.StringPtr(runID),
},
)
workflowContext.SetWorkflowExecution(s.mockMutableState)
release(nil)
s.mockMutableState.EXPECT().StartTransaction(gomock.Any()).Return(false, nil).AnyTimes()
s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes()
s.mockMutableState.EXPECT().GetVersionHistories().Return(versionHistories).AnyTimes()
s.mockMutableState.EXPECT().GetActivityInfo(gomock.Any()).Return(nil, false).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainByID(domainID).Return(cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: domainID, Name: "domainName"},
&persistence.DomainConfig{Retention: 1},
&persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
{ClusterName: cluster.TestAlternativeClusterName},
},
},
1,
nil,
), nil).AnyTimes()
s.mockExecutionMgr.On("GetReplicationTasks", mock.Anything, mock.Anything).Return(&persistence.GetReplicationTasksResponse{
Tasks: []*persistence.ReplicationTaskInfo{taskInfo},
NextPageToken: nil,
}, nil)
s.mockShard.Resource.ShardMgr.On("UpdateShard", mock.Anything, mock.Anything).Return(nil)
// Test BadRequestError
s.mockHistoryMgr.On("ReadRawHistoryBranch", mock.Anything, mock.Anything).Return(
nil,
&types.BadRequestError{},
).Times(1)
msg, err := s.ackManager.GetTasks(context.Background(), clusterName, taskID)
s.NoError(err)
s.Equal(taskID+1, msg.GetLastRetrievedMessageID())

// Test InternalDataInconsistencyError
s.mockHistoryMgr.On("ReadRawHistoryBranch", mock.Anything, mock.Anything).Return(
nil,
&types.InternalDataInconsistencyError{},
).Times(1)
msg, err = s.ackManager.GetTasks(context.Background(), clusterName, taskID)
s.NoError(err)
s.Equal(taskID+1, msg.GetLastRetrievedMessageID())

// Test EntityNotExistsError
s.mockHistoryMgr.On("ReadRawHistoryBranch", mock.Anything, mock.Anything).Return(
nil,
&types.EntityNotExistsError{},
).Times(1)
msg, err = s.ackManager.GetTasks(context.Background(), clusterName, taskID)
s.NoError(err)
s.Equal(taskID+1, msg.GetLastRetrievedMessageID())
}

0 comments on commit 79ce048

Please sign in to comment.