diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 76888a959ce79..5bcaddd8be72a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -878,6 +878,9 @@ Release 2.7.1 - UNRELEASED HDFS-8405. Fix a typo in NamenodeFsck. (Takanobu Asanuma via szetszwo) + HDFS-8404. Pending block replication can get stuck using older genstamp + (Nathan Roberts via kihwal) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 8012f7160c0ec..54981fb4a687d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1690,13 +1690,18 @@ private void processPendingReplications() { namesystem.writeLock(); try { for (int i = 0; i < timedOutItems.length; i++) { + /* + * Use the blockinfo from the blocksmap to be certain we're working + * with the most up-to-date block information (e.g. genstamp). + */ + BlockInfoContiguous bi = blocksMap.getStoredBlock(timedOutItems[i]); + if (bi == null) { + continue; + } NumberReplicas num = countNodes(timedOutItems[i]); - if (isNeededReplication(timedOutItems[i], getReplication(timedOutItems[i]), - num.liveReplicas())) { - neededReplications.add(timedOutItems[i], - num.liveReplicas(), - num.decommissionedAndDecommissioning(), - getReplication(timedOutItems[i])); + if (isNeededReplication(bi, getReplication(bi), num.liveReplicas())) { + neededReplications.add(bi, num.liveReplicas(), + num.decommissionedAndDecommissioning(), getReplication(bi)); } } } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java index c63badc9eb7c3..259404e8701a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.junit.Test; +import org.mockito.Mockito; /** * This class tests the internals of PendingReplicationBlocks.java, @@ -52,13 +53,11 @@ public class TestPendingReplication { private static final int DFS_REPLICATION_INTERVAL = 1; // Number of datanodes in the cluster private static final int DATANODE_COUNT = 5; - @Test public void testPendingReplication() { PendingReplicationBlocks pendingReplications; pendingReplications = new PendingReplicationBlocks(TIMEOUT * 1000); pendingReplications.start(); - // // Add 10 blocks to pendingReplications. // @@ -140,8 +139,7 @@ public void testPendingReplication() { // // Verify that everything has timed out. // - assertEquals("Size of pendingReplications ", - 0, pendingReplications.size()); + assertEquals("Size of pendingReplications ", 0, pendingReplications.size()); Block[] timedOut = pendingReplications.getTimedOutBlocks(); assertTrue(timedOut != null && timedOut.length == 15); for (int i = 0; i < timedOut.length; i++) { @@ -149,6 +147,98 @@ public void testPendingReplication() { } pendingReplications.stop(); } + +/* Test that processPendingReplications will use the most recent + * blockinfo from the blocksmap by placing a larger genstamp into + * the blocksmap. + */ + @Test + public void testProcessPendingReplications() throws Exception { + final Configuration conf = new HdfsConfiguration(); + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, TIMEOUT); + MiniDFSCluster cluster = null; + Block block; + BlockInfoContiguous blockInfo; + try { + cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_COUNT).build(); + cluster.waitActive(); + + FSNamesystem fsn = cluster.getNamesystem(); + BlockManager blkManager = fsn.getBlockManager(); + + PendingReplicationBlocks pendingReplications = + blkManager.pendingReplications; + UnderReplicatedBlocks neededReplications = blkManager.neededReplications; + BlocksMap blocksMap = blkManager.blocksMap; + + // + // Add 1 block to pendingReplications with GenerationStamp = 0. + // + + block = new Block(1, 1, 0); + blockInfo = new BlockInfoContiguous(block, (short) 3); + + pendingReplications.increment(block, + DatanodeStorageInfo.toDatanodeDescriptors( + DFSTestUtil.createDatanodeStorageInfos(1))); + BlockCollection bc = Mockito.mock(BlockCollection.class); + Mockito.doReturn((short) 3).when(bc).getPreferredBlockReplication(); + // Place into blocksmap with GenerationStamp = 1 + blockInfo.setGenerationStamp(1); + blocksMap.addBlockCollection(blockInfo, bc); + + assertEquals("Size of pendingReplications ", 1, + pendingReplications.size()); + + // Add a second block to pendingReplications that has no + // corresponding entry in blocksmap + block = new Block(2, 2, 0); + pendingReplications.increment(block, + DatanodeStorageInfo.toDatanodeDescriptors( + DFSTestUtil.createDatanodeStorageInfos(1))); + + // verify 2 blocks in pendingReplications + assertEquals("Size of pendingReplications ", 2, + pendingReplications.size()); + + // + // Wait for everything to timeout. + // + while (pendingReplications.size() > 0) { + try { + Thread.sleep(100); + } catch (Exception e) { + } + } + + // + // Verify that block moves to neededReplications + // + while (neededReplications.size() == 0) { + try { + Thread.sleep(100); + } catch (Exception e) { + } + } + + // Verify that the generation stamp we will try to replicate + // is now 1 + for (Block b: neededReplications) { + assertEquals("Generation stamp is 1 ", 1, + b.getGenerationStamp()); + } + + // Verify size of neededReplications is exactly 1. + assertEquals("size of neededReplications is 1 ", 1, + neededReplications.size()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } /** * Test if DatanodeProtocol#blockReceivedAndDeleted can correctly update the