Skip to content

Commit

Permalink
Delete exess internal block correctly.
Browse files Browse the repository at this point in the history
  • Loading branch information
RuinanGu committed Feb 27, 2024
1 parent 15af529 commit 7b46bfa
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4333,19 +4333,8 @@ private void chooseExcessRedundancyStriped(BlockCollection bc,
}
}

// cardinality of found indicates the expected number of internal blocks
final int numOfTarget = found.cardinality();
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
bc.getStoragePolicyID());
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
(short) numOfTarget, DatanodeStorageInfo.toStorageTypes(nonExcess));
if (excessTypes.isEmpty()) {
if(logEmptyExcessType) {
LOG.warn("excess types chosen for block {} among storages {} is empty",
storedBlock, nonExcess);
}
return;
}

BlockPlacementPolicy placementPolicy = placementPolicies.getPolicy(STRIPED);
// for each duplicated index, delete some replicas until only one left
Expand All @@ -4359,9 +4348,15 @@ private void chooseExcessRedundancyStriped(BlockCollection bc,
}
}
if (candidates.size() > 1) {
List<StorageType> internalExcessTypes = storagePolicy.chooseExcess(
(short) 1, DatanodeStorageInfo.toStorageTypes(candidates));
if (internalExcessTypes.isEmpty()) {
LOG.warn("excess types chosen for block {} among storages {} is empty",
storedBlock, candidates);
}
List<DatanodeStorageInfo> replicasToDelete = placementPolicy
.chooseReplicasToDelete(nonExcess, candidates, (short) 1,
excessTypes, null, null);
internalExcessTypes, null, null);
if (LOG.isDebugEnabled()) {
LOG.debug("Choose redundant EC replicas to delete from blk_{} which is located in {}",
sblk.getBlockId(), storage2index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
Expand Down Expand Up @@ -575,5 +569,82 @@ public void testReconstructionWithStorageTypeNotEnough() throws Exception {
cluster.shutdown();
}
}
@Test
public void testDeleteOverReplicatedStripedBlock() throws Exception {
final HdfsConfiguration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
false);
StorageType[][] st = new StorageType[groupSize + 2][1];
for (int i = 0;i < st.length-1;i++){
st[i] = new StorageType[]{StorageType.SSD};
}
st[st.length -1] = new StorageType[]{StorageType.DISK};

cluster = new MiniDFSCluster.Builder(conf).numDataNodes(groupSize + 2)
.storagesPerDatanode(1)
.storageTypes(st)
.build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
try {
fs.mkdirs(dirPath);
fs.setErasureCodingPolicy(dirPath,
StripedFileTestUtil.getDefaultECPolicy().getName());
fs.setStoragePolicy(dirPath, HdfsConstants.ALLSSD_STORAGE_POLICY_NAME);
DFSTestUtil.createFile(fs, filePath,
cellSize * dataBlocks * 2, (short) 1, 0L);
FSNamesystem fsn3 = cluster.getNamesystem();
BlockManager bm3 = fsn3.getBlockManager();
// stop a dn
LocatedBlocks blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0);
LocatedStripedBlock block = (LocatedStripedBlock) blks.getLastLocatedBlock();
DatanodeInfo dnToStop = block.getLocations()[0];

MiniDFSCluster.DataNodeProperties dnProp =
cluster.stopDataNode(dnToStop.getXferAddr());
cluster.setDataNodeDead(dnToStop);

// wait for reconstruction to happen
DFSTestUtil.waitForReplication(fs, filePath, groupSize, 15 * 1000);

DatanodeInfo dnToStop2 = block.getLocations()[1];
MiniDFSCluster.DataNodeProperties dnProp2 =
cluster.stopDataNode(dnToStop2.getXferAddr());
cluster.setDataNodeDead(dnToStop2);
DFSTestUtil.waitForReplication(fs, filePath, groupSize, 15 * 1000);

// bring the dn back: 10 internal blocks now
cluster.restartDataNode(dnProp);
cluster.waitActive();
DFSTestUtil.verifyClientStats(conf, cluster);

// currently namenode is able to track the missing block. but restart NN
cluster.restartNameNode(true);

for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerBlockReport(dn);
}

Thread.sleep(3000); // wait 3 running cycles of redundancy monitor
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerHeartbeat(dn);
}
boolean isDeletedRedundantBlock = true;
blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0);
block = (LocatedStripedBlock) blks.getLastLocatedBlock();
BitSet bitSet = new BitSet(groupSize);
for (byte index : block.getBlockIndices()) {
if(bitSet.get(index)){
isDeletedRedundantBlock = false;
}
bitSet.set(index);
}
assertTrue(isDeletedRedundantBlock);
} finally {
cluster.shutdown();
}
}
}

0 comments on commit 7b46bfa

Please sign in to comment.