diff --git a/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java index e87eecf2..16826e29 100644 --- a/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java +++ b/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java @@ -598,7 +598,11 @@ private void sendBatchAppendEntryRequest() throws Exception { switch (responseCode) { case SUCCESS: batchPendingMap.remove(x.getIndex()); - updatePeerWaterMark(x.getTerm(), peerId, x.getIndex() + x.getCount() - 1); + if (x.getCount() == 0) { + updatePeerWaterMark(x.getTerm(), peerId, x.getIndex()); + } else { + updatePeerWaterMark(x.getTerm(), peerId, x.getIndex() + x.getCount() - 1); + } break; case INCONSISTENT_STATE: logger.info("[Push-{}]Get INCONSISTENT_STATE when batch push index={} term={}", peerId, x.getIndex(), x.getTerm()); @@ -629,10 +633,10 @@ private void doBatchAppendInner(long index) throws Exception { private void doCheckBatchAppendResponse() throws Exception { long peerWaterMark = getPeerWaterMark(term, peerId); - Pair pair = batchPendingMap.get(peerWaterMark + 1); - if (pair != null && System.currentTimeMillis() - (long) pair.getKey() > dLedgerConfig.getMaxPushTimeOutMs()) { + Pair pair = batchPendingMap.get(peerWaterMark + 1); + if (pair != null && System.currentTimeMillis() - pair.getKey() > dLedgerConfig.getMaxPushTimeOutMs()) { long firstIndex = peerWaterMark + 1; - long lastIndex = firstIndex + (int) pair.getValue() - 1; + long lastIndex = firstIndex + pair.getValue() - 1; logger.warn("[Push-{}]Retry to push entry from {} to {}", peerId, firstIndex, lastIndex); batchAppendEntryRequest.clear(); for (long i = firstIndex; i <= lastIndex; i++) {